This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new c64e0e9 Bump the C++ standard to 17 (#525)
c64e0e9 is described below
commit c64e0e9635333cf74ebdcd12d77afcf2ecc9a01a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 3 20:07:35 2025 +0800
Bump the C++ standard to 17 (#525)
- Replace `boost::optional` with `std::optional`
- Replace `boost::any` with `std::any`
- Use `std::weak_from_this` to create a `weak_ptr` from a `shared_ptr`
- Leverage initializers for if to simplify code
---
CMakeLists.txt | 7 ++--
LegacyFindPackages.cmake | 5 +++
README.md | 3 ++
lib/AckGroupingTrackerEnabled.cc | 5 ++-
lib/ClientConnection.cc | 19 ++++++-----
lib/ClientConnection.h | 18 +++++------
lib/Commands.cc | 5 +--
lib/Commands.h | 23 +++++++------
lib/ConsumerImpl.cc | 64 ++++++++++++++++++-------------------
lib/ConsumerImpl.h | 15 ++++-----
lib/ConsumerImplBase.cc | 7 ++--
lib/HandlerBase.cc | 24 +++++++-------
lib/HandlerBase.h | 12 ++++---
lib/MultiTopicsConsumerImpl.cc | 8 ++---
lib/MultiTopicsConsumerImpl.h | 6 ++--
lib/NegativeAcksTracker.cc | 2 +-
lib/PeriodicTask.cc | 5 ++-
lib/ProducerConfiguration.cc | 4 +--
lib/ProducerConfigurationImpl.h | 9 +++---
lib/ProducerImpl.cc | 6 ++--
lib/ProducerImpl.h | 5 ++-
lib/RetryableOperationCache.h | 2 +-
lib/SynchronizedHashMap.h | 18 ++++++-----
lib/TableViewImpl.cc | 4 +--
lib/UnAckedMessageTrackerEnabled.cc | 5 ++-
lib/stats/ConsumerStatsImpl.cc | 2 +-
lib/stats/ProducerStatsImpl.cc | 2 +-
pkg/apk/build-apk.sh | 4 +--
pkg/mac/build-static-library.sh | 4 +--
tests/SynchronizedHashMapTest.cc | 9 +++---
version.txt | 2 +-
31 files changed, 154 insertions(+), 150 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c261031..0a0bd91 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -83,6 +83,10 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)
MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT})
+if (NOT CMAKE_CXX_STANDARD)
+ set(CMAKE_CXX_STANDARD 17)
+endif ()
+
# Compiler specific configuration:
#
https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang
if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
@@ -112,9 +116,6 @@ set(AUTOGEN_DIR ${PROJECT_BINARY_DIR}/generated)
file(MAKE_DIRECTORY ${AUTOGEN_DIR})
if (INTEGRATE_VCPKG)
- if (NOT CMAKE_CXX_STANDARD)
- set(CMAKE_CXX_STANDARD 11)
- endif ()
set(CMAKE_C_STANDARD 11)
set(Boost_NO_BOOST_CMAKE ON)
find_package(Boost REQUIRED)
diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake
index 5004545..3fa78d6 100644
--- a/LegacyFindPackages.cmake
+++ b/LegacyFindPackages.cmake
@@ -270,6 +270,11 @@ if (MSVC)
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_DEBUG
${CMAKE_CXX_FLAGS_DEBUG})
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELEASE
${CMAKE_CXX_FLAGS_RELEASE})
string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELWITHDEBINFO
${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
+ if (NOT CMAKE_CL_64)
+ # When building with a 32-bit cl.exe, the virtual address space is
limited to 2GB, which could be
+ # reached with /O2 optimization. Use /Os for smaller code size.
+ string(REGEX REPLACE "/O2" "/Os" CMAKE_CXX_FLAGS_RELEASE
${CMAKE_CXX_FLAGS_RELEASE})
+ endif ()
message(STATUS "CMAKE_CXX_FLAGS_DEBUG: " ${CMAKE_CXX_FLAGS_DEBUG})
message(STATUS "CMAKE_CXX_FLAGS_RELEASE: " ${CMAKE_CXX_FLAGS_RELEASE})
message(STATUS "CMAKE_CXX_FLAGS_RELWITHDEBINFO: "
${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
diff --git a/README.md b/README.md
index 4c86b63..0bcae7b 100644
--- a/README.md
+++ b/README.md
@@ -57,6 +57,9 @@ cmake -B build -DINTEGRATE_VCPKG=ON
cmake --build build -j8
```
+> - Before 4.0.0, C++11 is required.
+> - Since 4.0.0, C++17 is required.
+
The 1st step will download vcpkg and then install all dependencies according
to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will
build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the
CMake build directory.
> You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have
> vcpkg installed.
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 3a2a35d..faeb0ad 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -198,10 +198,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L,
this->ackGroupingTimeMs_)));
- std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
+ auto weakSelf = weak_from_this();
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
- auto self = weakSelf.lock();
- if (self && !ec) {
+ if (auto self = weakSelf.lock(); self && !ec) {
auto consumer = consumer_.lock();
if (!consumer || consumer->isClosingOrClosed()) {
return;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 49b2cbc..0bd935d 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -21,7 +21,6 @@
#include <openssl/x509.h>
#include <pulsar/MessageIdBuilder.h>
-#include <boost/optional.hpp>
#include <fstream>
#include "AsioDefines.h"
@@ -1127,19 +1126,19 @@ void ClientConnection::sendPendingCommands() {
if (--pendingWriteOperations_ > 0) {
assert(!pendingWriteBuffers_.empty());
- boost::any any = pendingWriteBuffers_.front();
+ auto any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();
auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
- SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
+ SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t)
{ handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
- auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
+ auto args = std::any_cast<std::shared_ptr<SendArguments>>(any);
BaseCommand outgoingCmd;
PairSharedBuffer buffer =
Commands::newSend(outgoingBuffer_, outgoingCmd,
getChecksumType(), *args);
@@ -1702,9 +1701,9 @@ void ClientConnection::handleProducerSuccess(const
proto::CommandProducerSuccess
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
- data.topicEpoch =
boost::make_optional(producerSuccess.topic_epoch());
+ data.topicEpoch =
std::make_optional(producerSuccess.topic_epoch());
} else {
- data.topicEpoch = boost::none;
+ data.topicEpoch = std::nullopt;
}
requestData.promise.setValue(data);
cancelTimer(*requestData.timer);
@@ -1805,7 +1804,7 @@ void ClientConnection::handleTopicMigrated(const
proto::CommandTopicMigrated& co
}
}
-boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
if (closeProducer.has_assignedbrokerserviceurltls()) {
@@ -1814,10 +1813,10 @@ boost::optional<std::string>
ClientConnection::getAssignedBrokerServiceUrl(
} else if (closeProducer.has_assignedbrokerserviceurl()) {
return closeProducer.assignedbrokerserviceurl();
}
- return boost::none;
+ return {};
}
-boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer) {
if (tlsSocket_) {
if (closeConsumer.has_assignedbrokerserviceurltls()) {
@@ -1826,7 +1825,7 @@ boost::optional<std::string>
ClientConnection::getAssignedBrokerServiceUrl(
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
return closeConsumer.assignedbrokerserviceurl();
}
- return boost::none;
+ return {};
}
void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer&
closeProducer) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index cf6be65..18a7d84 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -22,6 +22,7 @@
#include <pulsar/ClientConfiguration.h>
#include <pulsar/defines.h>
+#include <any>
#include <atomic>
#include <cstdint>
#ifdef USE_ASIO
@@ -37,8 +38,6 @@
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#endif
-#include <boost/any.hpp>
-#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
@@ -53,6 +52,9 @@
#include "SharedBuffer.h"
#include "TimeUtils.h"
#include "UtilAllocator.h"
+
+using std::optional;
+
namespace pulsar {
class PulsarFriend;
@@ -108,7 +110,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
- boost::optional<uint64_t> topicEpoch;
+ optional<uint64_t> topicEpoch;
};
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
@@ -141,10 +143,6 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
ConnectionPool& pool, size_t poolIndex);
~ClientConnection();
-#if __cplusplus < 201703L
- std::weak_ptr<ClientConnection> weak_from_this() noexcept { return
shared_from_this(); }
-#endif
-
/*
* starts tcp connect_async
* @return future<ConnectionPtr> which is not yet set
@@ -378,7 +376,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
typedef std::unique_lock<std::mutex> Lock;
// Pending buffers to write on the socket
- std::deque<boost::any> pendingWriteBuffers_;
+ std::deque<std::any> pendingWriteBuffers_;
int pendingWriteOperations_ = 0;
SharedBuffer outgoingBuffer_;
@@ -426,8 +424,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleGetTopicOfNamespaceResponse(const
proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
- boost::optional<std::string> getAssignedBrokerServiceUrl(const
proto::CommandCloseProducer&);
- boost::optional<std::string> getAssignedBrokerServiceUrl(const
proto::CommandCloseConsumer&);
+ optional<std::string> getAssignedBrokerServiceUrl(const
proto::CommandCloseProducer&);
+ optional<std::string> getAssignedBrokerServiceUrl(const
proto::CommandCloseConsumer&);
std::string getMigratedBrokerServiceUrl(const
proto::CommandTopicMigrated&);
// This method must be called when `mutex_` is held
void unsafeRemovePendingRequest(long requestId);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index dd62b21..3c687c0 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -34,6 +34,7 @@
#include "OpSendMsg.h"
#include "PulsarApi.pb.h"
#include "Url.h"
+#include "boost/throw_exception.hpp"
#include "checksum/ChecksumProvider.h"
using namespace pulsar;
@@ -329,7 +330,7 @@ SharedBuffer Commands::newAuthResponse(const
AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const
std::string& subscription,
uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType,
const std::string& consumerName,
SubscriptionMode subscriptionMode,
- boost::optional<MessageId> startMessageId,
bool readCompacted,
+ optional<MessageId> startMessageId, bool
readCompacted,
const std::map<std::string, std::string>&
metadata,
const std::map<std::string, std::string>&
subscriptionProperties,
const SchemaInfo& schemaInfo,
@@ -416,7 +417,7 @@ SharedBuffer Commands::newProducer(const std::string&
topic, uint64_t producerId
const std::map<std::string, std::string>&
metadata,
const SchemaInfo& schemaInfo, uint64_t
epoch,
bool userProvidedProducerName, bool
encrypted,
- ProducerAccessMode accessMode,
boost::optional<uint64_t> topicEpoch,
+ ProducerAccessMode accessMode,
optional<uint64_t> topicEpoch,
const std::string& initialSubscriptionName)
{
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
diff --git a/lib/Commands.h b/lib/Commands.h
index 15c3166..8403d6e 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -25,7 +25,7 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
-#include <boost/optional.hpp>
+#include <optional>
#include <set>
#include "ProtoApiEnums.h"
@@ -41,6 +41,7 @@ class MessageIdImpl;
using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
class BitSet;
struct SendArguments;
+using std::optional;
namespace proto {
class BaseCommand;
@@ -102,14 +103,16 @@ class Commands {
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand&
cmd, ChecksumType checksumType,
const SendArguments& args);
- static SharedBuffer newSubscribe(
- const std::string& topic, const std::string& subscription, uint64_t
consumerId, uint64_t requestId,
- CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode,
- boost::optional<MessageId> startMessageId, bool readCompacted,
- const std::map<std::string, std::string>& metadata,
- const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
- CommandSubscribe_InitialPosition subscriptionInitialPosition, bool
replicateSubscriptionState,
- const KeySharedPolicy& keySharedPolicy, int priorityLevel = 0);
+ static SharedBuffer newSubscribe(const std::string& topic, const
std::string& subscription,
+ uint64_t consumerId, uint64_t requestId,
+ CommandSubscribe_SubType subType, const
std::string& consumerName,
+ SubscriptionMode subscriptionMode,
optional<MessageId> startMessageId,
+ bool readCompacted, const
std::map<std::string, std::string>& metadata,
+ const std::map<std::string, std::string>&
subscriptionProperties,
+ const SchemaInfo& schemaInfo,
+ CommandSubscribe_InitialPosition
subscriptionInitialPosition,
+ bool replicateSubscriptionState, const
KeySharedPolicy& keySharedPolicy,
+ int priorityLevel = 0);
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t
requestId);
@@ -118,7 +121,7 @@ class Commands {
const std::map<std::string, std::string>&
metadata,
const SchemaInfo& schemaInfo, uint64_t
epoch,
bool userProvidedProducerName, bool
encrypted,
- ProducerAccessMode accessMode,
boost::optional<uint64_t> topicEpoch,
+ ProducerAccessMode accessMode,
optional<uint64_t> topicEpoch,
const std::string&
initialSubscriptionName);
static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t
entryId, const BitSet& ackSet,
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 3d5d294..4781e96 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -59,8 +59,7 @@ DECLARE_LOG_OBJECT()
using std::chrono::milliseconds;
using std::chrono::seconds;
-static boost::optional<MessageId> getStartMessageId(const
boost::optional<MessageId>& startMessageId,
- bool inclusive) {
+static optional<MessageId> getStartMessageId(const optional<MessageId>&
startMessageId, bool inclusive) {
if (!inclusive || !startMessageId) {
return startMessageId;
}
@@ -69,7 +68,7 @@ static boost::optional<MessageId> getStartMessageId(const
boost::optional<Messag
auto chunkMsgIdImpl =
dynamic_cast<const
ChunkMessageIdImpl*>(Commands::getMessageIdImpl(startMessageId.value()).get());
if (chunkMsgIdImpl) {
- return
boost::optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
+ return
optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
}
return startMessageId;
}
@@ -97,7 +96,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const
std::string& topic
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* =
NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode,
- const boost::optional<MessageId>& startMessageId)
+ const optional<MessageId>& startMessageId)
: ConsumerImplBase(
client, topic,
Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
@@ -243,7 +242,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
Lock lockForMessageId(mutexForMessageId_);
clearReceiveQueue();
const auto subscribeMessageId =
- (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ?
startMessageId_.get() : boost::none;
+ (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ?
startMessageId_.get() : std::nullopt;
lockForMessageId.unlock();
unAckedMessageTrackerPtr_->clear();
@@ -433,7 +432,7 @@ void ConsumerImpl::discardChunkMessages(const std::string&
uuid, const MessageId
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
- std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR&
ec) -> void {
auto self = weakSelf.lock();
if (!self) {
@@ -464,11 +463,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
});
}
-boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const
SharedBuffer& payload,
- const
proto::MessageMetadata& metadata,
- const
proto::MessageIdData& messageIdData,
- const
ClientConnectionPtr& cnx,
- MessageId&
messageId) {
+optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer&
payload,
+ const
proto::MessageMetadata& metadata,
+ const
proto::MessageIdData& messageIdData,
+ const
ClientConnectionPtr& cnx,
+ MessageId& messageId)
{
const auto chunkId = metadata.chunk_id();
const auto& uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " <<
uuid
@@ -521,14 +520,14 @@ boost::optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuff
lock.unlock();
increaseAvailablePermits(cnx);
trackMessage(messageId);
- return boost::none;
+ return {};
}
chunkedMsgCtx.appendChunk(messageId, payload);
if (!chunkedMsgCtx.isCompleted()) {
lock.unlock();
increaseAvailablePermits(cnx);
- return boost::none;
+ return {};
}
messageId =
std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
@@ -541,7 +540,7 @@ boost::optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuff
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload,
false)) {
return wholePayload;
} else {
- return boost::none;
+ return {};
}
}
@@ -1124,7 +1123,7 @@ void ConsumerImpl::clearReceiveQueue() {
if (hasSoughtByTimestamp()) {
// Invalidate startMessageId_ so that isPriorBatchIndex and
isPriorEntryIndex checks will be
// skipped, and hasMessageAvailableAsync won't use startMessageId_
in compare.
- startMessageId_ = boost::none;
+ startMessageId_ = std::nullopt;
} else {
startMessageId_ = seekMessageId_.get();
}
@@ -1313,11 +1312,11 @@ void ConsumerImpl::negativeAcknowledge(const MessageId&
messageId) {
negativeAcksTracker_->add(messageId);
}
-void ConsumerImpl::disconnectConsumer() { disconnectConsumer(boost::none); }
+void ConsumerImpl::disconnectConsumer() { disconnectConsumer(std::nullopt); }
-void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>&
assignedBrokerUrl) {
+void ConsumerImpl::disconnectConsumer(const optional<std::string>&
assignedBrokerUrl) {
LOG_INFO("Broker notification of Closed consumer: "
- << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " +
assignedBrokerUrl.get()) : ""));
+ << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " +
*assignedBrokerUrl) : ""));
resetCnx();
scheduleReconnection(assignedBrokerUrl);
}
@@ -1745,7 +1744,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
const SharedBuffer& seek, c
seekCallback_ = callback;
LOG_INFO(getName() << " Seeking subscription to " << seekArg);
- std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
+ auto weakSelf = weak_from_this();
cnx->sendRequestWithId(seek, requestId)
.addListener([this, weakSelf, callback, originalSeekMessageId](Result
result,
@@ -1851,9 +1850,9 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId&
messageId, const Proces
}
for (const auto& message : messages.value()) {
- std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
- deadLetterProducer_->getFuture().addListener([weakSelf, message,
messageId, cb](Result res,
-
Producer producer) {
+ auto weakSelf = weak_from_this();
+ deadLetterProducer_->getFuture().addListener([this, weakSelf, message,
messageId, cb](
+ Result res, Producer
producer) {
auto self = weakSelf.lock();
if (!self) {
return;
@@ -1872,30 +1871,29 @@ void ConsumerImpl::processPossibleToDLQ(const
MessageId& messageId, const Proces
if (message.hasOrderingKey()) {
msgBuilder.setOrderingKey(message.getOrderingKey());
}
- producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId,
messageId, cb](
+ producer.sendAsync(msgBuilder.build(), [this, weakSelf,
originMessageId, messageId, cb](
Result res, const
MessageId& messageIdInDLQ) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (res == ResultOk) {
- if (self->state_ != Ready) {
+ if (state_ != Ready) {
LOG_WARN(
"Send to the DLQ successfully, but consumer is not
ready. ignore acknowledge : "
- << self->state_);
+ << state_);
cb(false);
return;
}
-
self->possibleSendToDeadLetterTopicMessages_.remove(messageId);
- self->acknowledgeAsync(originMessageId, [weakSelf,
originMessageId, cb](Result result) {
+ possibleSendToDeadLetterTopicMessages_.remove(messageId);
+ acknowledgeAsync(originMessageId, [this, weakSelf,
originMessageId, cb](Result result) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (result != ResultOk) {
- LOG_WARN("{" << self->topic() << "} {" <<
self->subscription_ << "} {"
- << self->getConsumerName() << "}
Failed to acknowledge the message {"
- << originMessageId
+ LOG_WARN("{" << topic() << "} {" << subscription_
<< "} {" << getConsumerName()
+ << "} Failed to acknowledge the
message {" << originMessageId
<< "} of the original topic but send
to the DLQ successfully : "
<< result);
cb(false);
@@ -1906,9 +1904,9 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId&
messageId, const Proces
}
});
} else {
- LOG_WARN("{" << self->topic() << "} {" <<
self->subscription_ << "} {"
- << self->getConsumerName() << "} Failed to
send DLQ message to {"
- <<
self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
+ LOG_WARN("{" << topic() << "} {" << subscription_ << "} {"
<< getConsumerName()
+ << "} Failed to send DLQ message to {"
+ << deadLetterPolicy_.getDeadLetterTopic() <<
"} for message id "
<< "{" << originMessageId << "} : " << res);
cb(false);
}
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 5e06723..c1df080 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -21,7 +21,6 @@
#include <pulsar/Reader.h>
-#include <boost/optional.hpp>
#include <boost/variant.hpp>
#include <cstdint>
#include <functional>
@@ -92,7 +91,7 @@ class ConsumerImpl : public ConsumerImplBase {
const ExecutorServicePtr& listenerExecutor =
ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
- const boost::optional<MessageId>& startMessageId =
boost::none);
+ const optional<MessageId>& startMessageId =
optional<MessageId>());
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
@@ -146,7 +145,7 @@ class ConsumerImpl : public ConsumerImplBase {
void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback)
override;
virtual void disconnectConsumer();
- virtual void disconnectConsumer(const boost::optional<std::string>&
assignedBrokerUrl);
+ virtual void disconnectConsumer(const optional<std::string>&
assignedBrokerUrl);
Result fetchSingleMessageFromBroker(Message& msg);
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
@@ -270,7 +269,7 @@ class ConsumerImpl : public ConsumerImplBase {
std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
- Synchronized<boost::optional<MessageId>> startMessageId_;
+ Synchronized<optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
std::atomic<bool> hasSoughtByTimestamp_{false};
@@ -368,10 +367,10 @@ class ConsumerImpl : public ConsumerImplBase {
* @return the concatenated payload if chunks are concatenated into a
completed message payload
* successfully, else Optional::empty()
*/
- boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer&
payload,
- const
proto::MessageMetadata& metadata,
- const
proto::MessageIdData& messageIdData,
- const
ClientConnectionPtr& cnx, MessageId& messageId);
+ optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
+ const proto::MessageMetadata&
metadata,
+ const proto::MessageIdData&
messageIdData,
+ const ClientConnectionPtr& cnx,
MessageId& messageId);
bool hasMoreMessages() const {
std::lock_guard<std::mutex> lock{mutexForMessageId_};
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 171256d..55d44d3 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -50,11 +50,10 @@ ConsumerImplBase::ConsumerImplBase(const ClientImplPtr&
client, const std::strin
void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
if (timeoutMs > 0) {
batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
- std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
- if (self && !ec) {
- self->doBatchReceiveTimeTask();
+ if (auto self = weakSelf.lock(); self && !ec) {
+
std::static_pointer_cast<ConsumerImplBase>(self)->doBatchReceiveTimeTask();
}
});
}
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index ffc4e2c..37c6e2d 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -61,10 +61,9 @@ void HandlerBase::start() {
grabCnx();
}
creationTimer_->expires_after(operationTimeut_);
- std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
- auto self = weakSelf.lock();
- if (self && !error) {
+ if (auto self = weakSelf.lock(); self && !error) {
LOG_WARN("Cancel the pending reconnection due to the start
timeout");
connectionFailed(ResultTimeout);
cancelTimer(*timer_);
@@ -86,18 +85,18 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
connection_ = cnx;
}
-void HandlerBase::grabCnx() { grabCnx(boost::none); }
+void HandlerBase::grabCnx() { grabCnx(std::nullopt); }
Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
- const ClientImplPtr& client, const boost::optional<std::string>&
assignedBrokerUrl) {
+ const ClientImplPtr& client, const optional<std::string>&
assignedBrokerUrl) {
if (assignedBrokerUrl && client->getLookupCount() > 0) {
- return client->connect(getRedirectedClusterURI(),
assignedBrokerUrl.get(), connectionKeySuffix_);
+ return client->connect(getRedirectedClusterURI(), *assignedBrokerUrl,
connectionKeySuffix_);
} else {
return client->getConnection(getRedirectedClusterURI(), topic(),
connectionKeySuffix_);
}
}
-void HandlerBase::grabCnx(const boost::optional<std::string>&
assignedBrokerUrl) {
+void HandlerBase::grabCnx(const optional<std::string>& assignedBrokerUrl) {
bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_INFO(getName() << "Ignoring reconnection attempt since there's
already a pending reconnection");
@@ -177,8 +176,8 @@ void HandlerBase::handleDisconnection(Result result, const
ClientConnectionPtr&
break;
}
}
-void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); }
-void HandlerBase::scheduleReconnection(const boost::optional<std::string>&
assignedBrokerUrl) {
+void HandlerBase::scheduleReconnection() { scheduleReconnection(std::nullopt);
}
+void HandlerBase::scheduleReconnection(const optional<std::string>&
assignedBrokerUrl) {
const auto state = state_.load();
if (state == Pending || state == Ready) {
@@ -189,10 +188,9 @@ void HandlerBase::scheduleReconnection(const
boost::optional<std::string>& assig
// passing shared_ptr here since time_ will get destroyed, so tasks
will be cancelled
// so we will not run into the case where grabCnx is invoked on out of
scope handler
auto name = getName();
- std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
timer_->async_wait([name, weakSelf, assignedBrokerUrl](const
ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
- if (self) {
+ if (auto self = weakSelf.lock()) {
self->handleTimeout(ec, assignedBrokerUrl);
} else {
LOG_WARN(name << "Cancel the reconnection since the handler is
destroyed");
@@ -201,7 +199,7 @@ void HandlerBase::scheduleReconnection(const
boost::optional<std::string>& assig
}
}
-void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const
boost::optional<std::string>& assignedBrokerUrl) {
+void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const
optional<std::string>& assignedBrokerUrl) {
if (ec) {
LOG_INFO(getName() << "Ignoring timer cancelled event, code[" << ec <<
"]");
} else {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 967322f..acce15d 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -20,9 +20,9 @@
#define _PULSAR_HANDLER_BASE_HEADER_
#include <pulsar/Result.h>
-#include <boost/optional.hpp>
#include <memory>
#include <mutex>
+#include <optional>
#include <string>
#include "AsioTimer.h"
@@ -30,6 +30,8 @@
#include "Future.h"
#include "TimeUtils.h"
+using std::optional;
+
namespace pulsar {
class ClientImpl;
@@ -60,7 +62,7 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
* tries reconnection and sets connection_ to valid object
* @param assignedBrokerUrl assigned broker url to directly connect to
without lookup
*/
- void grabCnx(const boost::optional<std::string>& assignedBrokerUrl);
+ void grabCnx(const optional<std::string>& assignedBrokerUrl);
/*
* tries reconnection and sets connection_ to valid object
@@ -71,7 +73,7 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
* Schedule reconnection after backoff time
* @param assignedBrokerUrl assigned broker url to directly connect to
without lookup
*/
- void scheduleReconnection(const boost::optional<std::string>&
assignedBrokerUrl);
+ void scheduleReconnection(const optional<std::string>& assignedBrokerUrl);
/*
* Schedule reconnection after backoff time
*/
@@ -108,11 +110,11 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
const std::shared_ptr<std::string> topic_;
Future<Result, ClientConnectionPtr> getConnection(const ClientImplPtr&
client,
- const
boost::optional<std::string>& assignedBrokerUrl);
+ const
optional<std::string>& assignedBrokerUrl);
void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
- void handleTimeout(const ASIO_ERROR& ec, const
boost::optional<std::string>& assignedBrokerUrl);
+ void handleTimeout(const ASIO_ERROR& ec, const optional<std::string>&
assignedBrokerUrl);
protected:
ClientImplWeakPtr client_;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 7d73403..6e0ba86 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -46,7 +46,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const
ClientImplPtr& client, co
const LookupServicePtr&
lookupServicePtr,
const
ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode
subscriptionMode,
- const
boost::optional<MessageId>& startMessageId)
+ const optional<MessageId>&
startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()},
subscriptionName, topicName, conf,
lookupServicePtr, interceptors,
subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
@@ -56,7 +56,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
const ClientImplPtr& client, const std::vector<std::string>& topics, const
std::string& subscriptionName,
const TopicNamePtr& topicName, const ConsumerConfiguration& conf,
const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr&
interceptors,
- Commands::SubscriptionMode subscriptionMode, const
boost::optional<MessageId>& startMessageId)
+ Commands::SubscriptionMode subscriptionMode, const optional<MessageId>&
startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() :
"EmptyTopics",
Backoff(milliseconds(100), seconds(60),
milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
@@ -448,7 +448,7 @@ void
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
}
void MultiTopicsConsumerImpl::closeAsync(const ResultCallback&
originalCallback) {
- std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+ auto weakSelf = weak_from_this();
auto callback = [weakSelf, originalCallback](Result result) {
auto self = weakSelf.lock();
if (self) {
@@ -935,7 +935,7 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId&
msgId, const ResultCall
beforeSeek();
auto weakSelf = weak_from_this();
- optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result
result) {
+ optConsumer.value()->seekAsync(msgId, [this, weakSelf, callback](Result
result) {
auto self = weakSelf.lock();
if (self) {
afterSeek();
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index e92ec0e..b22227e 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -58,14 +58,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const LookupServicePtr& lookupServicePtr,
const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
- const boost::optional<MessageId>& startMessageId =
boost::none);
+ const optional<MessageId>& startMessageId =
optional<MessageId>{});
MultiTopicsConsumerImpl(const ClientImplPtr& client, const
std::vector<std::string>& topics,
const std::string& subscriptionName, const
TopicNamePtr& topicName,
const ConsumerConfiguration& conf, const
LookupServicePtr& lookupServicePtr_,
const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
- const boost::optional<MessageId>& startMessageId =
boost::none);
+ const optional<MessageId>& startMessageId =
optional<MessageId>{});
~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
@@ -131,7 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const std::vector<std::string> topics_;
std::queue<ReceiveCallback> pendingReceives_;
const Commands::SubscriptionMode subscriptionMode_;
- boost::optional<MessageId> startMessageId_;
+ optional<MessageId> startMessageId_;
ConsumerInterceptorsPtr interceptors_;
std::atomic_bool duringSeek_{false};
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index b691b18..cdff617 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -55,7 +55,7 @@ void NegativeAcksTracker::scheduleTimer() {
if (closed_) {
return;
}
- std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
timer_->expires_after(timerInterval_);
timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
if (auto self = weakSelf.lock()) {
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index c68d23a..9cc6215 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -28,11 +28,10 @@ void PeriodicTask::start() {
}
state_ = Ready;
if (periodMs_ >= 0) {
- std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
timer_->expires_after(std::chrono::milliseconds(periodMs_));
timer_->async_wait([weakSelf](const ErrorCode& ec) {
- auto self = weakSelf.lock();
- if (self) {
+ if (auto self = weakSelf.lock()) {
self->handleTimeout(ec);
}
});
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 278871c..f2ac592 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -38,7 +38,7 @@ ProducerConfiguration& ProducerConfiguration::operator=(const
ProducerConfigurat
}
ProducerConfiguration& ProducerConfiguration::setProducerName(const
std::string& producerName) {
- impl_->producerName = boost::make_optional(producerName);
+ impl_->producerName = std::make_optional(producerName);
return *this;
}
@@ -47,7 +47,7 @@ const std::string& ProducerConfiguration::getProducerName()
const {
}
ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t
initialSequenceId) {
- impl_->initialSequenceId = boost::make_optional(initialSequenceId);
+ impl_->initialSequenceId = std::make_optional(initialSequenceId);
return *this;
}
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index c324020..1ca2ce4 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -21,15 +21,16 @@
#include <pulsar/ProducerConfiguration.h>
-#include <boost/optional.hpp>
-#include <memory>
+#include <optional>
+
+using std::optional;
namespace pulsar {
struct ProducerConfigurationImpl {
SchemaInfo schemaInfo;
- boost::optional<std::string> producerName;
- boost::optional<int64_t> initialSequenceId;
+ optional<std::string> producerName;
+ optional<int64_t> initialSequenceId;
int sendTimeoutMs{30000};
CompressionType compressionType{CompressionNone};
int maxPendingMessages{1000};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 21c38c4..9d6a9a0 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -974,14 +974,14 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata&
metadata, SharedBuffer
encryptedPayload);
}
-void ProducerImpl::disconnectProducer(const boost::optional<std::string>&
assignedBrokerUrl) {
+void ProducerImpl::disconnectProducer(const optional<std::string>&
assignedBrokerUrl) {
LOG_INFO("Broker notification of Closed producer: "
- << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " +
assignedBrokerUrl.get()) : ""));
+ << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " +
*assignedBrokerUrl) : ""));
resetCnx();
scheduleReconnection(assignedBrokerUrl);
}
-void ProducerImpl::disconnectProducer() { disconnectProducer(boost::none); }
+void ProducerImpl::disconnectProducer() { disconnectProducer(std::nullopt); }
void ProducerImpl::start() {
HandlerBase::start();
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 77bd6d1..26207f8 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -26,7 +26,6 @@
#include <boost/asio/steady_timer.hpp>
#endif
#include <atomic>
-#include <boost/optional.hpp>
#include <list>
#include <memory>
@@ -99,7 +98,7 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
bool ackReceived(uint64_t sequenceId, MessageId& messageId);
- virtual void disconnectProducer(const boost::optional<std::string>&
assignedBrokerUrl);
+ virtual void disconnectProducer(const optional<std::string>&
assignedBrokerUrl);
virtual void disconnectProducer();
uint64_t getProducerId() const;
@@ -209,7 +208,7 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;
- boost::optional<uint64_t> topicEpoch;
+ optional<uint64_t> topicEpoch;
ProducerInterceptorsPtr interceptors_;
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index f2d390d..fa4c8fc 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -80,7 +80,7 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
operations_[key] = operation;
lock.unlock();
- std::weak_ptr<Self> weakSelf{this->shared_from_this()};
+ auto weakSelf = this->weak_from_this();
future.addListener([this, weakSelf, key, operation](Result, const
T&) {
auto self = weakSelf.lock();
if (!self) {
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index dacaf45..5f51cfd 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -19,14 +19,16 @@
#pragma once
#include <atomic>
-#include <boost/optional.hpp>
#include <functional>
#include <memory>
#include <mutex>
+#include <optional>
#include <unordered_map>
#include <utility>
#include <vector>
+using std::optional;
+
namespace pulsar {
class SharedFuture {
@@ -46,7 +48,7 @@ class SynchronizedHashMap {
using Lock = std::lock_guard<MutexType>;
public:
- using OptValue = boost::optional<V>;
+ using OptValue = optional<V>;
using PairVector = std::vector<std::pair<K, V>>;
using MapType = std::unordered_map<K, V>;
using Iterator = typename MapType::iterator;
@@ -60,12 +62,12 @@ class SynchronizedHashMap {
}
// Put a new key-value pair if the key does not exist.
- // Return boost::none if the key already exists or the existing value.
+ // Return an empty optional if the key already exists or the existing
value.
OptValue putIfAbsent(const K& key, const V& value) {
Lock lock(mutex_);
auto pair = data_.emplace(key, value);
if (pair.second) {
- return boost::none;
+ return {};
} else {
return pair.first->second;
}
@@ -157,7 +159,7 @@ class SynchronizedHashMap {
if (it != data_.end()) {
return it->second;
} else {
- return boost::none;
+ return {};
}
}
@@ -168,18 +170,18 @@ class SynchronizedHashMap {
return kv.second;
}
}
- return boost::none;
+ return {};
}
OptValue remove(const K& key) {
Lock lock(mutex_);
auto it = data_.find(key);
if (it != data_.end()) {
- auto result = boost::make_optional(std::move(it->second));
+ auto result = std::make_optional(std::move(it->second));
data_.erase(it);
return result;
} else {
- return boost::none;
+ return {};
}
}
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index e283a6f..f634fa5 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -70,7 +70,7 @@ bool TableViewImpl::getValue(const std::string& key,
std::string& value) const {
return false;
}
-bool TableViewImpl::containsKey(const std::string& key) const { return
data_.find(key) != boost::none; }
+bool TableViewImpl::containsKey(const std::string& key) const { return
static_cast<bool>(data_.find(key)); }
std::unordered_map<std::string, std::string> TableViewImpl::snapshot() {
return data_.move(); }
@@ -120,7 +120,7 @@ void TableViewImpl::handleMessage(const Message& msg) {
void TableViewImpl::readAllExistingMessages(const Promise<Result,
TableViewImplPtr>& promise, long startTime,
long messagesRead) {
- std::weak_ptr<TableViewImpl> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
reader_->hasMessageAvailableAsync(
[weakSelf, promise, startTime, messagesRead](Result result, bool
hasMessage) {
auto self = weakSelf.lock();
diff --git a/lib/UnAckedMessageTrackerEnabled.cc
b/lib/UnAckedMessageTrackerEnabled.cc
index 3e9ce0e..e5bd3d2 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -39,10 +39,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
ExecutorServicePtr executorService =
client->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_));
- std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
- if (self && !ec) {
+ if (auto self = weakSelf.lock(); self && !ec) {
self->timeoutHandler();
}
});
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 3dd1a73..ab4edd6 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -81,7 +81,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res,
CommandAck_AckType ackTy
void ConsumerStatsImpl::scheduleTimer() {
timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
- std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index a42532d..84bd1e2 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -110,7 +110,7 @@ ProducerStatsImpl::~ProducerStatsImpl() {
cancelTimer(*timer_); }
void ProducerStatsImpl::scheduleTimer() {
timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
- std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
+ auto weakSelf = weak_from_this();
timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
diff --git a/pkg/apk/build-apk.sh b/pkg/apk/build-apk.sh
index 68fdf89..4b80228 100755
--- a/pkg/apk/build-apk.sh
+++ b/pkg/apk/build-apk.sh
@@ -45,7 +45,7 @@ cp -r /root/packages/pkg ./build
apk add --allow-untrusted build/$PLATFORM/*.apk
cd $ROOT_DIR/win-examples
-g++ -o dynamic.out -std=c++11 ./example.cc -Wl,-rpath=/usr/lib -lpulsar
+g++ -o dynamic.out -std=c++17 ./example.cc -Wl,-rpath=/usr/lib -lpulsar
./dynamic.out
-g++ -o static.out -std=c++11 ./example.cc /usr/lib/libpulsarwithdeps.a
-lpthread -ldl
+g++ -o static.out -std=c++17 ./example.cc /usr/lib/libpulsarwithdeps.a
-lpthread -ldl
./static.out
diff --git a/pkg/mac/build-static-library.sh b/pkg/mac/build-static-library.sh
index 449222b..9190b86 100755
--- a/pkg/mac/build-static-library.sh
+++ b/pkg/mac/build-static-library.sh
@@ -70,7 +70,7 @@ cmake --build build-osx -j16 --target install
cp ./build-osx/libpulsarwithdeps.a $INSTALL_DIR/lib/
# Test the libraries
-clang++ win-examples/example.cc -o dynamic.out -std=c++11 -arch $ARCH -I
$INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar
+clang++ win-examples/example.cc -o dynamic.out -std=c++17 -arch $ARCH -I
$INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar
./dynamic.out
-clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
+clang++ win-examples/example.cc -o static.out -std=c++17 -arch $ARCH -I
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
./static.out
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index 9bc1c52..f690fd4 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -20,7 +20,6 @@
#include <algorithm>
#include <atomic>
-#include <boost/optional/optional_io.hpp>
#include <chrono>
#include <thread>
#include <vector>
@@ -101,8 +100,8 @@ TEST(SynchronizedHashMapTest, testForEach) {
ASSERT_TRUE(values.empty());
ASSERT_EQ(result, 1);
- ASSERT_EQ(m.putIfAbsent(1, 100), boost::none);
- ASSERT_EQ(m.putIfAbsent(1, 101), boost::optional<int>(100));
+ ASSERT_EQ(m.putIfAbsent(1, 100), optional<int>{});
+ ASSERT_EQ(m.putIfAbsent(1, 101), optional<int>(100));
m.forEachValue([&values](int value, const SharedFuture&) {
values.emplace_back(value); },
[&result] { result = 2; });
ASSERT_EQ(values, (std::vector<int>({100})));
@@ -116,8 +115,8 @@ TEST(SynchronizedHashMapTest, testForEach) {
ASSERT_EQ(result, 1);
values.clear();
- ASSERT_EQ(m.putIfAbsent(2, 200), boost::none);
- ASSERT_EQ(m.putIfAbsent(2, 201), boost::optional<int>(200));
+ ASSERT_EQ(m.putIfAbsent(2, 200), optional<int>{});
+ ASSERT_EQ(m.putIfAbsent(2, 201), optional<int>(200));
m.forEachValue([&values](int value, const SharedFuture&) {
values.emplace_back(value); },
[&result] { result = 2; });
std::sort(values.begin(), values.end());
diff --git a/version.txt b/version.txt
index 24a6729..8578fb2 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-3.8.0-pre
+4.0.0-pre