This is an automated email from the ASF dual-hosted git repository.
shibd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ce3dfa8 feat: add v2 APIs to create Consumer, Reader or TableView
(#581)
ce3dfa8 is described below
commit ce3dfa814fbc1aaddfe4f8c61f3b1d7954c99462
Author: Yunze Xu <[email protected]>
AuthorDate: Thu May 28 14:23:48 2026 +0800
feat: add v2 APIs to create Consumer, Reader or TableView (#581)
* feat: add v2 APIs to create Consumer, Reader or TableView
* improve tests for other v2 methods
* fix lint
---
include/pulsar/Client.h | 36 +++++++++
lib/Client.cc | 91 ++++++++++++++++++----
lib/ClientImpl.cc | 195 +++++++++++++++++++++++++++++++++---------------
lib/ClientImpl.h | 30 ++++++--
lib/TableViewImpl.cc | 20 ++---
lib/TableViewImpl.h | 4 +-
tests/AuthTokenTest.cc | 21 ++++--
7 files changed, 296 insertions(+), 101 deletions(-)
diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 4114075..63a2515 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -36,7 +36,9 @@
#include <memory>
#include <string>
+#include <utility>
#include <variant>
+#include <vector>
namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
@@ -47,6 +49,21 @@ typedef std::function<void(Result, const
std::vector<std::string>&)> GetPartitio
typedef std::function<void(Result)> CloseCallback;
using CreateProducerV2Callback = std::function<void(std::variant<Error,
Producer>)>;
+using CreateConsumerV2Callback = std::function<void(std::variant<Error,
Consumer>)>;
+using SubscribeV2Callback = CreateConsumerV2Callback;
+using ReaderV2Callback = std::function<void(std::variant<Error, Reader>)>;
+using TableViewV2Callback = std::function<void(std::variant<Error,
TableView>)>;
+
+/**
+ * Use TopicRegex with subscribeV2/subscribeAsyncV2 to distinguish a regex
pattern from a single topic name.
+ */
+struct TopicRegex {
+ explicit TopicRegex(std::string pattern) : pattern(std::move(pattern)) {}
+
+ std::string pattern;
+};
+
+using SubscribeTopics = std::variant<std::string, std::vector<std::string>,
TopicRegex>;
class ClientImpl;
class PulsarFriend;
@@ -188,6 +205,13 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::string& topic, const std::string&
subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback);
+ void subscribeAsyncV2(const SubscribeTopics& topics, const std::string&
subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback);
+
+ std::variant<Error, Consumer> subscribeV2(const SubscribeTopics& topics,
+ const std::string&
subscriptionName,
+ const ConsumerConfiguration&
conf);
+
/**
* Subscribe to multiple topics under the same namespace.
*
@@ -332,6 +356,12 @@ class PULSAR_PUBLIC Client {
void createReaderAsync(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, const
ReaderCallback& callback);
+ void createReaderAsyncV2(const std::string& topic, const MessageId&
startMessageId,
+ const ReaderConfiguration& conf, ReaderV2Callback
callback);
+
+ std::variant<Error, Reader> createReaderV2(const std::string& topic, const
MessageId& startMessageId,
+ const ReaderConfiguration&
conf);
+
/**
* Create a table view with given {@code TableViewConfiguration} for
specified topic.
*
@@ -362,6 +392,12 @@ class PULSAR_PUBLIC Client {
void createTableViewAsync(const std::string& topic, const
TableViewConfiguration& conf,
const TableViewCallback& callBack);
+ void createTableViewAsyncV2(const std::string& topic, const
TableViewConfiguration& conf,
+ TableViewV2Callback callback);
+
+ std::variant<Error, TableView> createTableViewV2(const std::string& topic,
+ const
TableViewConfiguration& conf);
+
/**
* Get the list of partitions for a given topic.
*
diff --git a/lib/Client.cc b/lib/Client.cc
index 32ab87c..11dfe26 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -26,15 +26,35 @@
#include "ClientImpl.h"
#include "Int64SerDes.h"
-#include "LogUtils.h"
#include "LookupService.h"
#include "TopicName.h"
#include "Utils.h"
-DECLARE_LOG_OBJECT()
-
namespace pulsar {
+namespace {
+
+template <typename T>
+void setPromiseValue(std::promise<std::variant<Error, T>>& promise, const
std::variant<Error, T>& value) {
+ if (const auto* error = std::get_if<Error>(&value)) {
+ promise.set_value(*error);
+ } else {
+ promise.set_value(std::get<T>(value));
+ }
+}
+
+template <typename T>
+void invokeLegacyCallback(const std::function<void(Result, T)>& callback,
+ const std::variant<Error, T>& value) {
+ if (const auto* error = std::get_if<Error>(&value)) {
+ callback(error->result, T());
+ } else {
+ callback(ResultOk, std::get<T>(value));
+ }
+}
+
+} // namespace
+
Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) {
impl_->initialize(); }
Client::Client(const std::string& serviceUrl) : Client(serviceUrl,
ClientConfiguration()) {}
@@ -83,13 +103,8 @@ void Client::createProducerAsyncV2(const std::string&
topic, const ProducerConfi
std::variant<Error, Producer> Client::createProducerV2(const std::string&
topic,
const
ProducerConfiguration& conf) {
std::promise<std::variant<Error, Producer>> promise;
- createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
- if (const auto* error = std::get_if<Error>(&v)) {
- promise.set_value(*error);
- } else {
- promise.set_value(std::get<Producer>(v));
- }
- });
+ createProducerAsyncV2(topic, conf,
+ [&promise](const auto& v) mutable {
setPromiseValue<Producer>(promise, v); });
return promise.get_future().get();
}
@@ -113,8 +128,22 @@ void Client::subscribeAsync(const std::string& topic,
const std::string& subscri
void Client::subscribeAsync(const std::string& topic, const std::string&
subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback) {
- LOG_INFO("Subscribing on Topic :" << topic);
- impl_->subscribeAsync(topic, subscriptionName, conf, callback);
+ subscribeAsyncV2(topic, subscriptionName, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void Client::subscribeAsyncV2(const SubscribeTopics& topics, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback) {
+ impl_->subscribeAsyncV2(topics, subscriptionName, conf,
std::move(callback));
+}
+
+std::variant<Error, Consumer> Client::subscribeV2(const SubscribeTopics&
topics,
+ const std::string&
subscriptionName,
+ const ConsumerConfiguration&
conf) {
+ std::promise<std::variant<Error, Consumer>> promise;
+ subscribeAsyncV2(topics, subscriptionName, conf,
+ [&promise](const auto& v) mutable {
setPromiseValue<Consumer>(promise, v); });
+ return promise.get_future().get();
}
Result Client::subscribe(const std::vector<std::string>& topics, const
std::string& subscriptionName,
@@ -138,7 +167,8 @@ void Client::subscribeAsync(const std::vector<std::string>&
topics, const std::s
void Client::subscribeAsync(const std::vector<std::string>& topics, const
std::string& subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback) {
- impl_->subscribeAsync(topics, subscriptionName, conf, callback);
+ subscribeAsyncV2(topics, subscriptionName, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Consumer>(callback, value); });
}
Result Client::subscribeWithRegex(const std::string& regexPattern, const
std::string& subscriptionName,
@@ -162,7 +192,8 @@ void Client::subscribeWithRegexAsync(const std::string&
regexPattern, const std:
void Client::subscribeWithRegexAsync(const std::string& regexPattern, const
std::string& subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback) {
- impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf,
callback);
+ subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Consumer>(callback, value); });
}
Result Client::createReader(const std::string& topic, const MessageId&
startMessageId,
@@ -176,7 +207,21 @@ Result Client::createReader(const std::string& topic,
const MessageId& startMess
void Client::createReaderAsync(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, const
ReaderCallback& callback) {
- impl_->createReaderAsync(topic, startMessageId, conf, callback);
+ createReaderAsyncV2(topic, startMessageId, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Reader>(callback, value); });
+}
+
+void Client::createReaderAsyncV2(const std::string& topic, const MessageId&
startMessageId,
+ const ReaderConfiguration& conf,
ReaderV2Callback callback) {
+ impl_->createReaderAsyncV2(topic, startMessageId, conf,
std::move(callback));
+}
+
+std::variant<Error, Reader> Client::createReaderV2(const std::string& topic,
const MessageId& startMessageId,
+ const ReaderConfiguration&
conf) {
+ std::promise<std::variant<Error, Reader>> promise;
+ createReaderAsyncV2(topic, startMessageId, conf,
+ [&promise](const auto& v) mutable {
setPromiseValue<Reader>(promise, v); });
+ return promise.get_future().get();
}
Result Client::createTableView(const std::string& topic, const
TableViewConfiguration& conf,
@@ -190,7 +235,21 @@ Result Client::createTableView(const std::string& topic,
const TableViewConfigur
void Client::createTableViewAsync(const std::string& topic, const
TableViewConfiguration& conf,
const TableViewCallback& callback) {
- impl_->createTableViewAsync(topic, conf, callback);
+ createTableViewAsyncV2(
+ topic, conf, [callback](const auto& value) {
invokeLegacyCallback<TableView>(callback, value); });
+}
+
+void Client::createTableViewAsyncV2(const std::string& topic, const
TableViewConfiguration& conf,
+ TableViewV2Callback callback) {
+ impl_->createTableViewAsyncV2(topic, conf, std::move(callback));
+}
+
+std::variant<Error, TableView> Client::createTableViewV2(const std::string&
topic,
+ const
TableViewConfiguration& conf) {
+ std::promise<std::variant<Error, TableView>> promise;
+ createTableViewAsyncV2(topic, conf,
+ [&promise](const auto& v) mutable {
setPromiseValue<TableView>(promise, v); });
+ return promise.get_future().get();
}
Result Client::getPartitionsForTopic(const std::string& topic,
std::vector<std::string>& partitions) {
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 1e4f022..dc79fce 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -62,6 +62,20 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
+namespace {
+
+template <typename T>
+void invokeLegacyCallback(const std::function<void(Result, T)>& callback,
+ const std::variant<Error, T>& value) {
+ if (const auto* error = std::get_if<Error>(&value)) {
+ callback(error->result, T());
+ } else {
+ callback(ResultOk, std::get<T>(value));
+ }
+}
+
+} // namespace
+
static const char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
static std::uniform_int_distribution<> hexDigitsDist(0, sizeof(hexDigits) - 1);
@@ -286,72 +300,93 @@ void ClientImpl::handleProducerCreated(const Error&
error, const ProducerImplBas
void ClientImpl::createReaderAsync(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, const
ReaderCallback& callback) {
+ createReaderAsyncV2(topic, startMessageId, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Reader>(callback, value); });
+}
+
+void ClientImpl::createReaderAsyncV2(const std::string& topic, const
MessageId& startMessageId,
+ const ReaderConfiguration& conf,
ReaderV2Callback callback) {
TopicNamePtr topicName;
{
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
- callback(ResultAlreadyClosed, Reader());
+ callback(Error{ResultAlreadyClosed, ""});
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
- callback(ResultInvalidTopicName, Reader());
+ callback(Error{ResultInvalidTopicName, ""});
return;
}
}
- getPartitionMetadataAsync(topicName).addListener([this,
self{shared_from_this()}, topicName,
- startMessageId, conf,
- callback](const auto&
error, const auto& metadata) {
- handleReaderMetadataLookup(error.result, metadata, topicName,
startMessageId, conf, callback);
- });
+ getPartitionMetadataAsync(topicName).addListener(
+ [this, self{shared_from_this()}, topicName, startMessageId, conf,
callback{std::move(callback)}](
+ const auto& error, const auto& metadata) {
+ handleReaderMetadataLookup(error, metadata, topicName,
startMessageId, conf, callback);
+ });
}
void ClientImpl::createTableViewAsync(const std::string& topic, const
TableViewConfiguration& conf,
const TableViewCallback& callback) {
+ createTableViewAsyncV2(
+ topic, conf, [callback](const auto& value) {
invokeLegacyCallback<TableView>(callback, value); });
+}
+
+void ClientImpl::createTableViewAsyncV2(const std::string& topic, const
TableViewConfiguration& conf,
+ TableViewV2Callback callback) {
TopicNamePtr topicName;
{
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
- callback(ResultAlreadyClosed, TableView());
+ callback(Error{ResultAlreadyClosed, ""});
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
- callback(ResultInvalidTopicName, TableView());
+ callback(Error{ResultInvalidTopicName, ""});
return;
}
}
TableViewImplPtr tableViewPtr =
std::make_shared<TableViewImpl>(shared_from_this(),
topicName->toString(), conf);
- tableViewPtr->start().addListener([callback](Result result, const
TableViewImplPtr& tableViewImplPtr) {
- if (result == ResultOk) {
- callback(result, TableView{tableViewImplPtr});
- } else {
- callback(result, {});
- }
- });
+ tableViewPtr->start().addListener(
+ [callback{std::move(callback)}](const Error& error, const
TableViewImplPtr& tableViewImplPtr) {
+ if (error.result == ResultOk) {
+ callback(TableView{tableViewImplPtr});
+ } else {
+ callback(error);
+ }
+ });
}
-void ClientImpl::handleReaderMetadataLookup(Result result, const
LookupDataResultPtr& partitionMetadata,
+void ClientImpl::handleReaderMetadataLookup(const Error& error, const
LookupDataResultPtr& partitionMetadata,
const TopicNamePtr& topicName,
const MessageId& startMessageId,
- const ReaderConfiguration& conf,
const ReaderCallback& callback) {
- if (result != ResultOk) {
+ const ReaderConfiguration& conf,
+ const ReaderV2Callback& callback) {
+ if (error.result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating
readeron "
- << topicName->toString() << " -- " << result);
- callback(result, Reader());
+ << topicName->toString() << " -- " << error.result);
+ callback(error);
return;
}
ReaderImplPtr reader;
try {
+ ReaderCallback readerCreatedCallback = [callback](Result result, const
Reader& reader) {
+ if (result == ResultOk) {
+ callback(reader);
+ } else {
+ callback(Error{result, ""});
+ }
+ };
reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(),
partitionMetadata->getPartitions(), conf,
- getListenerExecutorProvider()->get(),
callback));
+ getListenerExecutorProvider()->get(),
readerCreatedCallback));
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create reader: " << e.what());
- callback(ResultConnectError, {});
+ callback(Error{ResultConnectError, e.what()});
return;
}
ConsumerImplBasePtr consumer = reader->getConsumer();
@@ -375,18 +410,34 @@ void ClientImpl::handleReaderMetadataLookup(Result
result, const LookupDataResul
void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern,
const std::string& subscriptionName,
const ConsumerConfiguration& conf,
const SubscribeCallback& callback) {
+ subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void ClientImpl::subscribeAsyncV2(const SubscribeTopics& topics, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback) {
+ std::visit(
+ [this, &subscriptionName, &conf, callback{std::move(callback)}](const
auto& topicSpec) mutable {
+ subscribeToTopicsAsyncV2(topicSpec, subscriptionName, conf,
std::move(callback));
+ },
+ topics);
+}
+
+void ClientImpl::subscribeToTopicsAsyncV2(const TopicRegex& topicRegex, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback) {
+ const auto regexPattern = topicRegex.pattern;
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
- callback(ResultAlreadyClosed, Consumer());
+ callback(Error{ResultAlreadyClosed, ""});
return;
} else {
lock.unlock();
if (!topicNamePtr) {
LOG_ERROR("Topic pattern not valid: " << regexPattern);
- callback(ResultInvalidTopicName, Consumer());
+ callback(Error{ResultInvalidTopicName, ""});
return;
}
}
@@ -411,14 +462,16 @@ void ClientImpl::subscribeWithRegexAsync(const
std::string& regexPattern, const
break;
default:
LOG_ERROR("RegexSubscriptionMode not valid: " <<
regexSubscriptionMode);
- callback(ResultInvalidConfiguration, Consumer());
+ callback(Error{ResultInvalidConfiguration, ""});
return;
}
getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
- .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer,
shared_from_this(),
- std::placeholders::_1, std::placeholders::_2,
regexPattern, mode,
- subscriptionName, conf, callback));
+ .addListener([self{shared_from_this()}, regexPattern, mode,
subscriptionName, conf,
+ callback{std::move(callback)}](Result result, const
NamespaceTopicsPtr& topics) {
+ self->createPatternMultiTopicsConsumer(result, topics,
regexPattern, mode, subscriptionName, conf,
+ callback);
+ });
}
void ClientImpl::createPatternMultiTopicsConsumer(Result result, const
NamespaceTopicsPtr& topics,
@@ -426,7 +479,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result
result, const Namespace
CommandGetTopicsOfNamespace_Mode mode,
const std::string&
subscriptionName,
const ConsumerConfiguration&
conf,
- const SubscribeCallback&
callback) {
+ SubscribeV2Callback
callback) {
if (result == ResultOk) {
ConsumerImplBasePtr consumer;
@@ -441,18 +494,27 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result
result, const Namespace
shared_from_this(), regexPattern, mode, *matchTopics,
subscriptionName, conf, interceptors);
consumer->getConsumerCreatedFuture().addListener(
- std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, callback, consumer));
+ [self{shared_from_this()}, callback{std::move(callback)},
consumer](
+ Result result, const ConsumerImplBaseWeakPtr&
consumerImplBaseWeakPtr) {
+ self->handleConsumerCreated(result, consumerImplBaseWeakPtr,
callback, consumer);
+ });
consumer->start();
} else {
LOG_ERROR("Error Getting topicsOfNameSpace while
createPatternMultiTopicsConsumer: " << result);
- callback(result, Consumer());
+ callback(Error{result, ""});
}
}
void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
const SubscribeCallback& callback) {
+ subscribeAsyncV2(originalTopics, subscriptionName, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void ClientImpl::subscribeToTopicsAsyncV2(const std::vector<std::string>&
originalTopics,
+ const std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback) {
TopicNamePtr topicNamePtr;
// Remove duplicates from the list of topics
@@ -464,12 +526,12 @@ void ClientImpl::subscribeAsync(const
std::vector<std::string>& originalTopics,
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
- callback(ResultAlreadyClosed, Consumer());
+ callback(Error{ResultAlreadyClosed, ""});
return;
} else {
if (!topics.empty() && !(topicNamePtr =
MultiTopicsConsumerImpl::topicNamesValid(topics))) {
lock.unlock();
- callback(ResultInvalidTopicName, Consumer());
+ callback(Error{ResultInvalidTopicName, ""});
return;
}
}
@@ -487,45 +549,54 @@ void ClientImpl::subscribeAsync(const
std::vector<std::string>& originalTopics,
ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
shared_from_this(), topics, subscriptionName, topicNamePtr, conf,
interceptors);
-
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
-
shared_from_this(), std::placeholders::_1,
-
std::placeholders::_2, callback, consumer));
+ consumer->getConsumerCreatedFuture().addListener(
+ [self{shared_from_this()}, callback{std::move(callback)}, consumer](
+ Result result, const ConsumerImplBaseWeakPtr&
consumerImplBaseWeakPtr) {
+ self->handleConsumerCreated(result, consumerImplBaseWeakPtr,
callback, consumer);
+ });
consumer->start();
}
void ClientImpl::subscribeAsync(const std::string& topic, const std::string&
subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback) {
+ subscribeAsyncV2(topic, subscriptionName, conf,
+ [callback](const auto& value) {
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void ClientImpl::subscribeToTopicsAsyncV2(const std::string& topic, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback) {
+ LOG_INFO("Subscribing on Topic :" << topic);
TopicNamePtr topicName;
{
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
- callback(ResultAlreadyClosed, Consumer());
+ callback(Error{ResultAlreadyClosed, ""});
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
- callback(ResultInvalidTopicName, Consumer());
+ callback(Error{ResultInvalidTopicName, ""});
return;
} else if (conf.isReadCompacted() &&
(topicName->getDomain().compare("persistent") != 0 ||
(conf.getConsumerType() !=
ConsumerExclusive &&
conf.getConsumerType() !=
ConsumerFailover))) {
lock.unlock();
- callback(ResultInvalidConfiguration, Consumer());
+ callback(Error{ResultInvalidConfiguration, ""});
return;
}
}
- getPartitionMetadataAsync(topicName).addListener([this,
self{shared_from_this()}, topicName,
- subscriptionName, conf,
- callback](const auto&
error, const auto& metadata) {
- handleSubscribe(error.result, metadata, topicName, subscriptionName,
conf, callback);
- });
+ getPartitionMetadataAsync(topicName).addListener(
+ [this, self{shared_from_this()}, topicName, subscriptionName, conf,
callback{std::move(callback)}](
+ const auto& error, const auto& metadata) {
+ handleSubscribe(error, metadata, topicName, subscriptionName,
conf, callback);
+ });
}
-void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr&
partitionMetadata,
+void ClientImpl::handleSubscribe(const Error& error, const
LookupDataResultPtr& partitionMetadata,
const TopicNamePtr& topicName, const
std::string& subscriptionName,
- ConsumerConfiguration conf, const
SubscribeCallback& callback) {
- if (result == ResultOk) {
+ ConsumerConfiguration conf,
SubscribeV2Callback callback) {
+ if (error.result == ResultOk) {
// generate random name if not supplied by the customer.
if (conf.getConsumerName().empty()) {
conf.setConsumerName(generateRandomName());
@@ -537,7 +608,7 @@ void ClientImpl::handleSubscribe(Result result, const
LookupDataResultPtr& parti
if (partitionMetadata->getPartitions() > 0) {
if (conf.getReceiverQueueSize() == 0) {
LOG_ERROR("Can't use partitioned topic if the queue size
is 0.");
- callback(ResultInvalidConfiguration, Consumer());
+ callback(Error{ResultInvalidConfiguration, ""});
return;
}
consumer =
std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
@@ -552,22 +623,24 @@ void ClientImpl::handleSubscribe(Result result, const
LookupDataResultPtr& parti
}
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create consumer: " << e.what());
- callback(ResultConnectError, {});
+ callback(Error{ResultConnectError, e.what()});
return;
}
consumer->getConsumerCreatedFuture().addListener(
- std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, callback, consumer));
+ [self{shared_from_this()}, callback{std::move(callback)},
consumer](
+ Result result, const ConsumerImplBaseWeakPtr&
consumerImplBaseWeakPtr) {
+ self->handleConsumerCreated(result, consumerImplBaseWeakPtr,
callback, consumer);
+ });
consumer->start();
} else {
- LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing
on " << topicName->toString()
-
<< " -- " << result);
- callback(result, Consumer());
+ LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing
on "
+ << topicName->toString() << " -- " << error.result);
+ callback(error);
}
}
void ClientImpl::handleConsumerCreated(Result result, const
ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr,
- const SubscribeCallback& callback,
+ const SubscribeV2Callback& callback,
const ConsumerImplBasePtr& consumer) {
if (result == ResultOk) {
auto address = consumer.get();
@@ -576,18 +649,18 @@ void ClientImpl::handleConsumerCreated(Result result,
const ConsumerImplBaseWeak
auto consumer = existingConsumer.value().lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< address << ", consumer: " << (consumer ?
consumer->getName() : "(null)"));
- callback(ResultUnknownError, {});
+ callback(Error{ResultUnknownError, ""});
return;
}
- callback(result, Consumer(consumer));
+ callback(Consumer(consumer));
} else {
// In order to be compatible with the current broker error code
confusion.
//
https://github.com/apache/pulsar/blob/cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java#L240-L241
if (result == ResultProducerBusy) {
LOG_ERROR("Failed to create consumer: SubscriptionName cannot be
empty.");
- callback(ResultInvalidConfiguration, {});
+ callback(Error{ResultInvalidConfiguration, "SubscriptionName
cannot be empty."});
} else {
- callback(result, {});
+ callback(Error{result, ""});
}
}
}
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index c046d8e..cdabf1f 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -98,6 +98,9 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void subscribeAsync(const std::string& topic, const std::string&
subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback);
+ void subscribeAsyncV2(const SubscribeTopics& topics, const std::string&
subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback);
+
void subscribeAsync(const std::vector<std::string>& topics, const
std::string& subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback);
@@ -107,9 +110,15 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void createReaderAsync(const std::string& topic, const MessageId&
startMessageId,
const ReaderConfiguration& conf, const
ReaderCallback& callback);
+ void createReaderAsyncV2(const std::string& topic, const MessageId&
startMessageId,
+ const ReaderConfiguration& conf, ReaderV2Callback
callback);
+
void createTableViewAsync(const std::string& topic, const
TableViewConfiguration& conf,
const TableViewCallback& callback);
+ void createTableViewAsyncV2(const std::string& topic, const
TableViewConfiguration& conf,
+ TableViewV2Callback callback);
+
void getPartitionsForTopicAsync(const std::string& topic, const
GetPartitionsCallback& callback);
// Use virtual method to test
@@ -175,13 +184,22 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
const TopicNamePtr& topicName, const
ProducerConfiguration& conf,
CreateProducerV2Callback callback);
- void handleSubscribe(Result result, const LookupDataResultPtr&
partitionMetadata,
+ void handleSubscribe(const Error& error, const LookupDataResultPtr&
partitionMetadata,
const TopicNamePtr& topicName, const std::string&
consumerName,
- ConsumerConfiguration conf, const SubscribeCallback&
callback);
+ ConsumerConfiguration conf, SubscribeV2Callback
callback);
+
+ void subscribeToTopicsAsyncV2(const std::string& topic, const std::string&
subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback);
+
+ void subscribeToTopicsAsyncV2(const std::vector<std::string>& topics,
const std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback);
+
+ void subscribeToTopicsAsyncV2(const TopicRegex& topicRegex, const
std::string& subscriptionName,
+ const ConsumerConfiguration& conf,
SubscribeV2Callback callback);
- void handleReaderMetadataLookup(Result result, const LookupDataResultPtr&
partitionMetadata,
+ void handleReaderMetadataLookup(const Error& error, const
LookupDataResultPtr& partitionMetadata,
const TopicNamePtr& topicName, const
MessageId& startMessageId,
- const ReaderConfiguration& conf, const
ReaderCallback& callback);
+ const ReaderConfiguration& conf, const
ReaderV2Callback& callback);
void handleGetPartitions(Result result, const LookupDataResultPtr&
partitionMetadata,
const TopicNamePtr& topicName, const
GetPartitionsCallback& callback);
@@ -189,7 +207,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void handleProducerCreated(const Error& error, const
ProducerImplBaseWeakPtr& producerWeakPtr,
const CreateProducerV2Callback& callback, const
ProducerImplBasePtr& producer);
void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr&
consumerWeakPtr,
- const SubscribeCallback& callback, const
ConsumerImplBasePtr& consumer);
+ const SubscribeV2Callback& callback, const
ConsumerImplBasePtr& consumer);
typedef std::shared_ptr<int> SharedInt;
@@ -199,7 +217,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
const std::string& regexPattern,
CommandGetTopicsOfNamespace_Mode
mode,
const std::string& consumerName,
const ConsumerConfiguration& conf,
- const SubscribeCallback& callback);
+ SubscribeV2Callback callback);
const std::string& getPhysicalAddress(const std::string&
redirectedClusterURI,
const std::string& logicalAddress);
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index f634fa5..686cd55 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -32,23 +32,23 @@ TableViewImpl::TableViewImpl(const ClientImplPtr& client,
const std::string& top
const TableViewConfiguration& conf)
: client_(client), topic_(topic), conf_(conf) {}
-Future<Result, TableViewImplPtr> TableViewImpl::start() {
- Promise<Result, TableViewImplPtr> promise;
+Future<Error, TableViewImplPtr> TableViewImpl::start() {
+ Promise<Error, TableViewImplPtr> promise;
ReaderConfiguration readerConfiguration;
readerConfiguration.setSchema(conf_.schemaInfo);
readerConfiguration.setReadCompacted(true);
readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName);
TableViewImplPtr self = shared_from_this();
- ReaderCallback readerCallback = [self, promise](Result res, const Reader&
reader) {
- if (res == ResultOk) {
- self->reader_ = reader.impl_;
+ ReaderV2Callback readerCallback = [self, promise](const
std::variant<Error, Reader>& result) {
+ if (const auto* reader = std::get_if<Reader>(&result)) {
+ self->reader_ = reader->impl_;
self->readAllExistingMessages(promise,
TimeUtils::currentTimeMillis(), 0);
} else {
- promise.setFailed(res);
+ promise.setFailed(std::get<Error>(result));
}
};
- client_->createReaderAsync(topic_, MessageId::earliest(),
readerConfiguration, readerCallback);
+ client_->createReaderAsyncV2(topic_, MessageId::earliest(),
readerConfiguration, readerCallback);
return promise.getFuture();
}
@@ -118,14 +118,14 @@ void TableViewImpl::handleMessage(const Message& msg) {
}
}
-void TableViewImpl::readAllExistingMessages(const Promise<Result,
TableViewImplPtr>& promise, long startTime,
+void TableViewImpl::readAllExistingMessages(const Promise<Error,
TableViewImplPtr>& promise, long startTime,
long messagesRead) {
auto weakSelf = weak_from_this();
reader_->hasMessageAvailableAsync(
[weakSelf, promise, startTime, messagesRead](Result result, bool
hasMessage) {
auto self = weakSelf.lock();
if (!self || result != ResultOk) {
- promise.setFailed(result);
+ promise.setFailed(Error{result, ""});
return;
}
if (hasMessage) {
@@ -135,7 +135,7 @@ void TableViewImpl::readAllExistingMessages(const
Promise<Result, TableViewImplP
[weakSelf, promise, startTime, messagesRead, topic](Result
res, const Message& msg) {
auto self = weakSelf.lock();
if (!self || res != ResultOk) {
- promise.setFailed(res);
+ promise.setFailed(Error{res, ""});
LOG_ERROR("Start table view failed, reader msg for
"
<< topic << " error: " <<
strResult(res));
} else {
diff --git a/lib/TableViewImpl.h b/lib/TableViewImpl.h
index eda5247..a968fa8 100644
--- a/lib/TableViewImpl.h
+++ b/lib/TableViewImpl.h
@@ -45,7 +45,7 @@ class TableViewImpl : public
std::enable_shared_from_this<TableViewImpl> {
~TableViewImpl(){};
- Future<Result, TableViewImplPtr> start();
+ Future<Error, TableViewImplPtr> start();
bool retrieveValue(const std::string& key, std::string& value);
@@ -77,7 +77,7 @@ class TableViewImpl : public
std::enable_shared_from_this<TableViewImpl> {
SynchronizedHashMap<std::string, std::string> data_;
void handleMessage(const Message& msg);
- void readAllExistingMessages(const Promise<Result, TableViewImplPtr>&
promise, long startTime,
+ void readAllExistingMessages(const Promise<Error, TableViewImplPtr>&
promise, long startTime,
long messagesRead);
void readTailMessage();
};
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index d38e62c..e2c4c16 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -54,6 +54,15 @@ std::string getToken() {
return str;
}
+template <typename T>
+void assertV2Error(const std::variant<Error, T>& result, Result expectedResult,
+ const std::string& expectedMessage) {
+ const auto* error = std::get_if<Error>(&result);
+ ASSERT_NE(nullptr, error);
+ EXPECT_EQ(expectedResult, error->result);
+ EXPECT_EQ(expectedMessage, error->message);
+}
+
TEST(AuthPluginToken, testToken) {
ClientConfiguration config = ClientConfiguration();
std::string token = getToken();
@@ -181,17 +190,17 @@ TEST(AuthPluginToken, testNoAuth) {
std::string topicName = "persistent://private/auth/test-token";
std::string subName = "subscription-name";
+ const std::string expectedErrorMessage = "Client is not authorized to Get
Partition Metadata";
Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultAuthorizationError, result);
- std::visit(overloaded{[](Error&& error) {
- ASSERT_EQ(ResultAuthorizationError,
error.result);
- ASSERT_EQ("Client is not authorized to Get
Partition Metadata", error.message);
- },
- [](auto&&) { FAIL(); }},
- client.createProducerV2(topicName, {}));
+ assertV2Error(client.createProducerV2(topicName, {}),
ResultAuthorizationError, expectedErrorMessage);
+ assertV2Error(client.subscribeV2(topicName, subName, {}),
ResultAuthorizationError, expectedErrorMessage);
+ assertV2Error(client.createReaderV2(topicName, MessageId::earliest(), {}),
ResultAuthorizationError,
+ expectedErrorMessage);
+ assertV2Error(client.createTableViewV2(topicName, {}),
ResultAuthorizationError, expectedErrorMessage);
Consumer consumer;
result = client.subscribe(topicName, subName, consumer);