This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new c64e0e9  Bump the C++ standard to 17 (#525)
c64e0e9 is described below

commit c64e0e9635333cf74ebdcd12d77afcf2ecc9a01a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 3 20:07:35 2025 +0800

    Bump the C++ standard to 17 (#525)
    
    - Replace `boost::optional` with `std::optional`
    - Replace `boost::any` with `std::any`
    - Use `std::weak_from_this` to create a `weak_ptr` from a `shared_ptr`
    - Leverage initializers for if to simplify code
---
 CMakeLists.txt                      |  7 ++--
 LegacyFindPackages.cmake            |  5 +++
 README.md                           |  3 ++
 lib/AckGroupingTrackerEnabled.cc    |  5 ++-
 lib/ClientConnection.cc             | 19 ++++++-----
 lib/ClientConnection.h              | 18 +++++------
 lib/Commands.cc                     |  5 +--
 lib/Commands.h                      | 23 +++++++------
 lib/ConsumerImpl.cc                 | 64 ++++++++++++++++++-------------------
 lib/ConsumerImpl.h                  | 15 ++++-----
 lib/ConsumerImplBase.cc             |  7 ++--
 lib/HandlerBase.cc                  | 24 +++++++-------
 lib/HandlerBase.h                   | 12 ++++---
 lib/MultiTopicsConsumerImpl.cc      |  8 ++---
 lib/MultiTopicsConsumerImpl.h       |  6 ++--
 lib/NegativeAcksTracker.cc          |  2 +-
 lib/PeriodicTask.cc                 |  5 ++-
 lib/ProducerConfiguration.cc        |  4 +--
 lib/ProducerConfigurationImpl.h     |  9 +++---
 lib/ProducerImpl.cc                 |  6 ++--
 lib/ProducerImpl.h                  |  5 ++-
 lib/RetryableOperationCache.h       |  2 +-
 lib/SynchronizedHashMap.h           | 18 ++++++-----
 lib/TableViewImpl.cc                |  4 +--
 lib/UnAckedMessageTrackerEnabled.cc |  5 ++-
 lib/stats/ConsumerStatsImpl.cc      |  2 +-
 lib/stats/ProducerStatsImpl.cc      |  2 +-
 pkg/apk/build-apk.sh                |  4 +--
 pkg/mac/build-static-library.sh     |  4 +--
 tests/SynchronizedHashMapTest.cc    |  9 +++---
 version.txt                         |  2 +-
 31 files changed, 154 insertions(+), 150 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index c261031..0a0bd91 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -83,6 +83,10 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE)
 find_package(Threads REQUIRED)
 MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT})
 
+if (NOT CMAKE_CXX_STANDARD)
+    set(CMAKE_CXX_STANDARD 17)
+endif ()
+
 # Compiler specific configuration:
 # 
https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang
 if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
@@ -112,9 +116,6 @@ set(AUTOGEN_DIR ${PROJECT_BINARY_DIR}/generated)
 file(MAKE_DIRECTORY ${AUTOGEN_DIR})
 
 if (INTEGRATE_VCPKG)
-    if (NOT CMAKE_CXX_STANDARD)
-        set(CMAKE_CXX_STANDARD 11)
-    endif ()
     set(CMAKE_C_STANDARD 11)
     set(Boost_NO_BOOST_CMAKE ON)
     find_package(Boost REQUIRED)
diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake
index 5004545..3fa78d6 100644
--- a/LegacyFindPackages.cmake
+++ b/LegacyFindPackages.cmake
@@ -270,6 +270,11 @@ if (MSVC)
         string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_DEBUG 
${CMAKE_CXX_FLAGS_DEBUG})
         string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELEASE 
${CMAKE_CXX_FLAGS_RELEASE})
         string(REGEX REPLACE "/MD" "/MT" CMAKE_CXX_FLAGS_RELWITHDEBINFO 
${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
+        if (NOT CMAKE_CL_64)
+            # When building with a 32-bit cl.exe, the virtual address space is 
limited to 2GB, which could be
+            # reached with /O2 optimization. Use /Os for smaller code size.
+            string(REGEX REPLACE "/O2" "/Os" CMAKE_CXX_FLAGS_RELEASE 
${CMAKE_CXX_FLAGS_RELEASE})
+        endif ()
         message(STATUS "CMAKE_CXX_FLAGS_DEBUG: " ${CMAKE_CXX_FLAGS_DEBUG})
         message(STATUS "CMAKE_CXX_FLAGS_RELEASE: " ${CMAKE_CXX_FLAGS_RELEASE})
         message(STATUS "CMAKE_CXX_FLAGS_RELWITHDEBINFO: " 
${CMAKE_CXX_FLAGS_RELWITHDEBINFO})
diff --git a/README.md b/README.md
index 4c86b63..0bcae7b 100644
--- a/README.md
+++ b/README.md
@@ -57,6 +57,9 @@ cmake -B build -DINTEGRATE_VCPKG=ON
 cmake --build build -j8
 ```
 
+> - Before 4.0.0, C++11 is required.
+> - Since 4.0.0, C++17 is required.
+
 The 1st step will download vcpkg and then install all dependencies according 
to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will 
build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the 
CMake build directory.
 
 > You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have 
 > vcpkg installed.
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 3a2a35d..faeb0ad 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -198,10 +198,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     this->timer_ = this->executor_->createDeadlineTimer();
     this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, 
this->ackGroupingTimeMs_)));
-    std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
+    auto weakSelf = weak_from_this();
     this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
-        auto self = weakSelf.lock();
-        if (self && !ec) {
+        if (auto self = weakSelf.lock(); self && !ec) {
             auto consumer = consumer_.lock();
             if (!consumer || consumer->isClosingOrClosed()) {
                 return;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 49b2cbc..0bd935d 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -21,7 +21,6 @@
 #include <openssl/x509.h>
 #include <pulsar/MessageIdBuilder.h>
 
-#include <boost/optional.hpp>
 #include <fstream>
 
 #include "AsioDefines.h"
@@ -1127,19 +1126,19 @@ void ClientConnection::sendPendingCommands() {
 
     if (--pendingWriteOperations_ > 0) {
         assert(!pendingWriteBuffers_.empty());
-        boost::any any = pendingWriteBuffers_.front();
+        auto any = pendingWriteBuffers_.front();
         pendingWriteBuffers_.pop_front();
 
         auto self = shared_from_this();
         if (any.type() == typeid(SharedBuffer)) {
-            SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
+            SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
             asyncWrite(buffer.const_asio_buffer(),
                        customAllocWriteHandler(
                            [this, self, buffer](const ASIO_ERROR& err, size_t) 
{ handleSend(err, buffer); }));
         } else {
             assert(any.type() == typeid(std::shared_ptr<SendArguments>));
 
-            auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
+            auto args = std::any_cast<std::shared_ptr<SendArguments>>(any);
             BaseCommand outgoingCmd;
             PairSharedBuffer buffer =
                 Commands::newSend(outgoingBuffer_, outgoingCmd, 
getChecksumType(), *args);
@@ -1702,9 +1701,9 @@ void ClientConnection::handleProducerSuccess(const 
proto::CommandProducerSuccess
                 data.schemaVersion = producerSuccess.schema_version();
             }
             if (producerSuccess.has_topic_epoch()) {
-                data.topicEpoch = 
boost::make_optional(producerSuccess.topic_epoch());
+                data.topicEpoch = 
std::make_optional(producerSuccess.topic_epoch());
             } else {
-                data.topicEpoch = boost::none;
+                data.topicEpoch = std::nullopt;
             }
             requestData.promise.setValue(data);
             cancelTimer(*requestData.timer);
@@ -1805,7 +1804,7 @@ void ClientConnection::handleTopicMigrated(const 
proto::CommandTopicMigrated& co
     }
 }
 
-boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
     const proto::CommandCloseProducer& closeProducer) {
     if (tlsSocket_) {
         if (closeProducer.has_assignedbrokerserviceurltls()) {
@@ -1814,10 +1813,10 @@ boost::optional<std::string> 
ClientConnection::getAssignedBrokerServiceUrl(
     } else if (closeProducer.has_assignedbrokerserviceurl()) {
         return closeProducer.assignedbrokerserviceurl();
     }
-    return boost::none;
+    return {};
 }
 
-boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
+optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
     const proto::CommandCloseConsumer& closeConsumer) {
     if (tlsSocket_) {
         if (closeConsumer.has_assignedbrokerserviceurltls()) {
@@ -1826,7 +1825,7 @@ boost::optional<std::string> 
ClientConnection::getAssignedBrokerServiceUrl(
     } else if (closeConsumer.has_assignedbrokerserviceurl()) {
         return closeConsumer.assignedbrokerserviceurl();
     }
-    return boost::none;
+    return {};
 }
 
 void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& 
closeProducer) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index cf6be65..18a7d84 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -22,6 +22,7 @@
 #include <pulsar/ClientConfiguration.h>
 #include <pulsar/defines.h>
 
+#include <any>
 #include <atomic>
 #include <cstdint>
 #ifdef USE_ASIO
@@ -37,8 +38,6 @@
 #include <boost/asio/ssl/stream.hpp>
 #include <boost/asio/strand.hpp>
 #endif
-#include <boost/any.hpp>
-#include <boost/optional.hpp>
 #include <deque>
 #include <functional>
 #include <memory>
@@ -53,6 +52,9 @@
 #include "SharedBuffer.h"
 #include "TimeUtils.h"
 #include "UtilAllocator.h"
+
+using std::optional;
+
 namespace pulsar {
 
 class PulsarFriend;
@@ -108,7 +110,7 @@ struct ResponseData {
     std::string producerName;
     int64_t lastSequenceId;
     std::string schemaVersion;
-    boost::optional<uint64_t> topicEpoch;
+    optional<uint64_t> topicEpoch;
 };
 
 typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
@@ -141,10 +143,6 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
                      ConnectionPool& pool, size_t poolIndex);
     ~ClientConnection();
 
-#if __cplusplus < 201703L
-    std::weak_ptr<ClientConnection> weak_from_this() noexcept { return 
shared_from_this(); }
-#endif
-
     /*
      * starts tcp connect_async
      * @return future<ConnectionPtr> which is not yet set
@@ -378,7 +376,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     typedef std::unique_lock<std::mutex> Lock;
 
     // Pending buffers to write on the socket
-    std::deque<boost::any> pendingWriteBuffers_;
+    std::deque<std::any> pendingWriteBuffers_;
     int pendingWriteOperations_ = 0;
 
     SharedBuffer outgoingBuffer_;
@@ -426,8 +424,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     void handleGetTopicOfNamespaceResponse(const 
proto::CommandGetTopicsOfNamespaceResponse&);
     void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
     void handleAckResponse(const proto::CommandAckResponse&);
-    boost::optional<std::string> getAssignedBrokerServiceUrl(const 
proto::CommandCloseProducer&);
-    boost::optional<std::string> getAssignedBrokerServiceUrl(const 
proto::CommandCloseConsumer&);
+    optional<std::string> getAssignedBrokerServiceUrl(const 
proto::CommandCloseProducer&);
+    optional<std::string> getAssignedBrokerServiceUrl(const 
proto::CommandCloseConsumer&);
     std::string getMigratedBrokerServiceUrl(const 
proto::CommandTopicMigrated&);
     // This method must be called when `mutex_` is held
     void unsafeRemovePendingRequest(long requestId);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index dd62b21..3c687c0 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -34,6 +34,7 @@
 #include "OpSendMsg.h"
 #include "PulsarApi.pb.h"
 #include "Url.h"
+#include "boost/throw_exception.hpp"
 #include "checksum/ChecksumProvider.h"
 
 using namespace pulsar;
@@ -329,7 +330,7 @@ SharedBuffer Commands::newAuthResponse(const 
AuthenticationPtr& authentication,
 SharedBuffer Commands::newSubscribe(const std::string& topic, const 
std::string& subscription,
                                     uint64_t consumerId, uint64_t requestId, 
CommandSubscribe_SubType subType,
                                     const std::string& consumerName, 
SubscriptionMode subscriptionMode,
-                                    boost::optional<MessageId> startMessageId, 
bool readCompacted,
+                                    optional<MessageId> startMessageId, bool 
readCompacted,
                                     const std::map<std::string, std::string>& 
metadata,
                                     const std::map<std::string, std::string>& 
subscriptionProperties,
                                     const SchemaInfo& schemaInfo,
@@ -416,7 +417,7 @@ SharedBuffer Commands::newProducer(const std::string& 
topic, uint64_t producerId
                                    const std::map<std::string, std::string>& 
metadata,
                                    const SchemaInfo& schemaInfo, uint64_t 
epoch,
                                    bool userProvidedProducerName, bool 
encrypted,
-                                   ProducerAccessMode accessMode, 
boost::optional<uint64_t> topicEpoch,
+                                   ProducerAccessMode accessMode, 
optional<uint64_t> topicEpoch,
                                    const std::string& initialSubscriptionName) 
{
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
diff --git a/lib/Commands.h b/lib/Commands.h
index 15c3166..8403d6e 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -25,7 +25,7 @@
 #include <pulsar/Schema.h>
 #include <pulsar/defines.h>
 
-#include <boost/optional.hpp>
+#include <optional>
 #include <set>
 
 #include "ProtoApiEnums.h"
@@ -41,6 +41,7 @@ class MessageIdImpl;
 using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
 class BitSet;
 struct SendArguments;
+using std::optional;
 
 namespace proto {
 class BaseCommand;
@@ -102,14 +103,16 @@ class Commands {
     static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& 
cmd, ChecksumType checksumType,
                                     const SendArguments& args);
 
-    static SharedBuffer newSubscribe(
-        const std::string& topic, const std::string& subscription, uint64_t 
consumerId, uint64_t requestId,
-        CommandSubscribe_SubType subType, const std::string& consumerName, 
SubscriptionMode subscriptionMode,
-        boost::optional<MessageId> startMessageId, bool readCompacted,
-        const std::map<std::string, std::string>& metadata,
-        const std::map<std::string, std::string>& subscriptionProperties, 
const SchemaInfo& schemaInfo,
-        CommandSubscribe_InitialPosition subscriptionInitialPosition, bool 
replicateSubscriptionState,
-        const KeySharedPolicy& keySharedPolicy, int priorityLevel = 0);
+    static SharedBuffer newSubscribe(const std::string& topic, const 
std::string& subscription,
+                                     uint64_t consumerId, uint64_t requestId,
+                                     CommandSubscribe_SubType subType, const 
std::string& consumerName,
+                                     SubscriptionMode subscriptionMode, 
optional<MessageId> startMessageId,
+                                     bool readCompacted, const 
std::map<std::string, std::string>& metadata,
+                                     const std::map<std::string, std::string>& 
subscriptionProperties,
+                                     const SchemaInfo& schemaInfo,
+                                     CommandSubscribe_InitialPosition 
subscriptionInitialPosition,
+                                     bool replicateSubscriptionState, const 
KeySharedPolicy& keySharedPolicy,
+                                     int priorityLevel = 0);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t 
requestId);
 
@@ -118,7 +121,7 @@ class Commands {
                                     const std::map<std::string, std::string>& 
metadata,
                                     const SchemaInfo& schemaInfo, uint64_t 
epoch,
                                     bool userProvidedProducerName, bool 
encrypted,
-                                    ProducerAccessMode accessMode, 
boost::optional<uint64_t> topicEpoch,
+                                    ProducerAccessMode accessMode, 
optional<uint64_t> topicEpoch,
                                     const std::string& 
initialSubscriptionName);
 
     static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t 
entryId, const BitSet& ackSet,
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 3d5d294..4781e96 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -59,8 +59,7 @@ DECLARE_LOG_OBJECT()
 using std::chrono::milliseconds;
 using std::chrono::seconds;
 
-static boost::optional<MessageId> getStartMessageId(const 
boost::optional<MessageId>& startMessageId,
-                                                    bool inclusive) {
+static optional<MessageId> getStartMessageId(const optional<MessageId>& 
startMessageId, bool inclusive) {
     if (!inclusive || !startMessageId) {
         return startMessageId;
     }
@@ -69,7 +68,7 @@ static boost::optional<MessageId> getStartMessageId(const 
boost::optional<Messag
     auto chunkMsgIdImpl =
         dynamic_cast<const 
ChunkMessageIdImpl*>(Commands::getMessageIdImpl(startMessageId.value()).get());
     if (chunkMsgIdImpl) {
-        return 
boost::optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
+        return 
optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
     }
     return startMessageId;
 }
@@ -97,7 +96,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, const 
std::string& topic
                            bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = 
NonPartitioned by default */,
                            Commands::SubscriptionMode subscriptionMode,
-                           const boost::optional<MessageId>& startMessageId)
+                           const optional<MessageId>& startMessageId)
     : ConsumerImplBase(
           client, topic,
           
Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
@@ -243,7 +242,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     Lock lockForMessageId(mutexForMessageId_);
     clearReceiveQueue();
     const auto subscribeMessageId =
-        (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? 
startMessageId_.get() : boost::none;
+        (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? 
startMessageId_.get() : std::nullopt;
     lockForMessageId.unlock();
 
     unAckedMessageTrackerPtr_->clear();
@@ -433,7 +432,7 @@ void ConsumerImpl::discardChunkMessages(const std::string& 
uuid, const MessageId
 
 void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
     
checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
-    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& 
ec) -> void {
         auto self = weakSelf.lock();
         if (!self) {
@@ -464,11 +463,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
     });
 }
 
-boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const 
SharedBuffer& payload,
-                                                                const 
proto::MessageMetadata& metadata,
-                                                                const 
proto::MessageIdData& messageIdData,
-                                                                const 
ClientConnectionPtr& cnx,
-                                                                MessageId& 
messageId) {
+optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& 
payload,
+                                                         const 
proto::MessageMetadata& metadata,
+                                                         const 
proto::MessageIdData& messageIdData,
+                                                         const 
ClientConnectionPtr& cnx,
+                                                         MessageId& messageId) 
{
     const auto chunkId = metadata.chunk_id();
     const auto& uuid = metadata.uuid();
     LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << 
uuid
@@ -521,14 +520,14 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
         lock.unlock();
         increaseAvailablePermits(cnx);
         trackMessage(messageId);
-        return boost::none;
+        return {};
     }
 
     chunkedMsgCtx.appendChunk(messageId, payload);
     if (!chunkedMsgCtx.isCompleted()) {
         lock.unlock();
         increaseAvailablePermits(cnx);
-        return boost::none;
+        return {};
     }
 
     messageId = 
std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
@@ -541,7 +540,7 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
     if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, 
false)) {
         return wholePayload;
     } else {
-        return boost::none;
+        return {};
     }
 }
 
@@ -1124,7 +1123,7 @@ void ConsumerImpl::clearReceiveQueue() {
         if (hasSoughtByTimestamp()) {
             // Invalidate startMessageId_ so that isPriorBatchIndex and 
isPriorEntryIndex checks will be
             // skipped, and hasMessageAvailableAsync won't use startMessageId_ 
in compare.
-            startMessageId_ = boost::none;
+            startMessageId_ = std::nullopt;
         } else {
             startMessageId_ = seekMessageId_.get();
         }
@@ -1313,11 +1312,11 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& 
messageId) {
     negativeAcksTracker_->add(messageId);
 }
 
-void ConsumerImpl::disconnectConsumer() { disconnectConsumer(boost::none); }
+void ConsumerImpl::disconnectConsumer() { disconnectConsumer(std::nullopt); }
 
-void ConsumerImpl::disconnectConsumer(const boost::optional<std::string>& 
assignedBrokerUrl) {
+void ConsumerImpl::disconnectConsumer(const optional<std::string>& 
assignedBrokerUrl) {
     LOG_INFO("Broker notification of Closed consumer: "
-             << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + 
assignedBrokerUrl.get()) : ""));
+             << consumerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + 
*assignedBrokerUrl) : ""));
     resetCnx();
     scheduleReconnection(assignedBrokerUrl);
 }
@@ -1745,7 +1744,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
const SharedBuffer& seek, c
     seekCallback_ = callback;
     LOG_INFO(getName() << " Seeking subscription to " << seekArg);
 
-    std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
+    auto weakSelf = weak_from_this();
 
     cnx->sendRequestWithId(seek, requestId)
         .addListener([this, weakSelf, callback, originalSeekMessageId](Result 
result,
@@ -1851,9 +1850,9 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& 
messageId, const Proces
     }
 
     for (const auto& message : messages.value()) {
-        std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
-        deadLetterProducer_->getFuture().addListener([weakSelf, message, 
messageId, cb](Result res,
-                                                                               
         Producer producer) {
+        auto weakSelf = weak_from_this();
+        deadLetterProducer_->getFuture().addListener([this, weakSelf, message, 
messageId, cb](
+                                                         Result res, Producer 
producer) {
             auto self = weakSelf.lock();
             if (!self) {
                 return;
@@ -1872,30 +1871,29 @@ void ConsumerImpl::processPossibleToDLQ(const 
MessageId& messageId, const Proces
             if (message.hasOrderingKey()) {
                 msgBuilder.setOrderingKey(message.getOrderingKey());
             }
-            producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, 
messageId, cb](
+            producer.sendAsync(msgBuilder.build(), [this, weakSelf, 
originMessageId, messageId, cb](
                                                        Result res, const 
MessageId& messageIdInDLQ) {
                 auto self = weakSelf.lock();
                 if (!self) {
                     return;
                 }
                 if (res == ResultOk) {
-                    if (self->state_ != Ready) {
+                    if (state_ != Ready) {
                         LOG_WARN(
                             "Send to the DLQ successfully, but consumer is not 
ready. ignore acknowledge : "
-                            << self->state_);
+                            << state_);
                         cb(false);
                         return;
                     }
-                    
self->possibleSendToDeadLetterTopicMessages_.remove(messageId);
-                    self->acknowledgeAsync(originMessageId, [weakSelf, 
originMessageId, cb](Result result) {
+                    possibleSendToDeadLetterTopicMessages_.remove(messageId);
+                    acknowledgeAsync(originMessageId, [this, weakSelf, 
originMessageId, cb](Result result) {
                         auto self = weakSelf.lock();
                         if (!self) {
                             return;
                         }
                         if (result != ResultOk) {
-                            LOG_WARN("{" << self->topic() << "} {" << 
self->subscription_ << "} {"
-                                         << self->getConsumerName() << "} 
Failed to acknowledge the message {"
-                                         << originMessageId
+                            LOG_WARN("{" << topic() << "} {" << subscription_ 
<< "} {" << getConsumerName()
+                                         << "} Failed to acknowledge the 
message {" << originMessageId
                                          << "} of the original topic but send 
to the DLQ successfully : "
                                          << result);
                             cb(false);
@@ -1906,9 +1904,9 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& 
messageId, const Proces
                         }
                     });
                 } else {
-                    LOG_WARN("{" << self->topic() << "} {" << 
self->subscription_ << "} {"
-                                 << self->getConsumerName() << "} Failed to 
send DLQ message to {"
-                                 << 
self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
+                    LOG_WARN("{" << topic() << "} {" << subscription_ << "} {" 
<< getConsumerName()
+                                 << "} Failed to send DLQ message to {"
+                                 << deadLetterPolicy_.getDeadLetterTopic() << 
"} for message id "
                                  << "{" << originMessageId << "} : " << res);
                     cb(false);
                 }
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 5e06723..c1df080 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -21,7 +21,6 @@
 
 #include <pulsar/Reader.h>
 
-#include <boost/optional.hpp>
 #include <boost/variant.hpp>
 #include <cstdint>
 #include <functional>
@@ -92,7 +91,7 @@ class ConsumerImpl : public ConsumerImplBase {
                  const ExecutorServicePtr& listenerExecutor = 
ExecutorServicePtr(), bool hasParent = false,
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
-                 const boost::optional<MessageId>& startMessageId = 
boost::none);
+                 const optional<MessageId>& startMessageId = 
optional<MessageId>());
     ~ConsumerImpl();
     void setPartitionIndex(int partitionIndex);
     int getPartitionIndex();
@@ -146,7 +145,7 @@ class ConsumerImpl : public ConsumerImplBase {
     void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) 
override;
 
     virtual void disconnectConsumer();
-    virtual void disconnectConsumer(const boost::optional<std::string>& 
assignedBrokerUrl);
+    virtual void disconnectConsumer(const optional<std::string>& 
assignedBrokerUrl);
     Result fetchSingleMessageFromBroker(Message& msg);
 
     virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
@@ -270,7 +269,7 @@ class ConsumerImpl : public ConsumerImplBase {
 
     std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
     Synchronized<ResultCallback> seekCallback_{[](Result) {}};
-    Synchronized<boost::optional<MessageId>> startMessageId_;
+    Synchronized<optional<MessageId>> startMessageId_;
     Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
     std::atomic<bool> hasSoughtByTimestamp_{false};
 
@@ -368,10 +367,10 @@ class ConsumerImpl : public ConsumerImplBase {
      * @return the concatenated payload if chunks are concatenated into a 
completed message payload
      *   successfully, else Optional::empty()
      */
-    boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& 
payload,
-                                                      const 
proto::MessageMetadata& metadata,
-                                                      const 
proto::MessageIdData& messageIdData,
-                                                      const 
ClientConnectionPtr& cnx, MessageId& messageId);
+    optional<SharedBuffer> processMessageChunk(const SharedBuffer& payload,
+                                               const proto::MessageMetadata& 
metadata,
+                                               const proto::MessageIdData& 
messageIdData,
+                                               const ClientConnectionPtr& cnx, 
MessageId& messageId);
 
     bool hasMoreMessages() const {
         std::lock_guard<std::mutex> lock{mutexForMessageId_};
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 171256d..55d44d3 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -50,11 +50,10 @@ ConsumerImplBase::ConsumerImplBase(const ClientImplPtr& 
client, const std::strin
 void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
     if (timeoutMs > 0) {
         
batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
-        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        auto weakSelf = weak_from_this();
         batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
-            auto self = weakSelf.lock();
-            if (self && !ec) {
-                self->doBatchReceiveTimeTask();
+            if (auto self = weakSelf.lock(); self && !ec) {
+                
std::static_pointer_cast<ConsumerImplBase>(self)->doBatchReceiveTimeTask();
             }
         });
     }
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index ffc4e2c..37c6e2d 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -61,10 +61,9 @@ void HandlerBase::start() {
         grabCnx();
     }
     creationTimer_->expires_after(operationTimeut_);
-    std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
-        auto self = weakSelf.lock();
-        if (self && !error) {
+        if (auto self = weakSelf.lock(); self && !error) {
             LOG_WARN("Cancel the pending reconnection due to the start 
timeout");
             connectionFailed(ResultTimeout);
             cancelTimer(*timer_);
@@ -86,18 +85,18 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
     connection_ = cnx;
 }
 
-void HandlerBase::grabCnx() { grabCnx(boost::none); }
+void HandlerBase::grabCnx() { grabCnx(std::nullopt); }
 
 Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
-    const ClientImplPtr& client, const boost::optional<std::string>& 
assignedBrokerUrl) {
+    const ClientImplPtr& client, const optional<std::string>& 
assignedBrokerUrl) {
     if (assignedBrokerUrl && client->getLookupCount() > 0) {
-        return client->connect(getRedirectedClusterURI(), 
assignedBrokerUrl.get(), connectionKeySuffix_);
+        return client->connect(getRedirectedClusterURI(), *assignedBrokerUrl, 
connectionKeySuffix_);
     } else {
         return client->getConnection(getRedirectedClusterURI(), topic(), 
connectionKeySuffix_);
     }
 }
 
-void HandlerBase::grabCnx(const boost::optional<std::string>& 
assignedBrokerUrl) {
+void HandlerBase::grabCnx(const optional<std::string>& assignedBrokerUrl) {
     bool expectedState = false;
     if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
         LOG_INFO(getName() << "Ignoring reconnection attempt since there's 
already a pending reconnection");
@@ -177,8 +176,8 @@ void HandlerBase::handleDisconnection(Result result, const 
ClientConnectionPtr&
             break;
     }
 }
-void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); }
-void HandlerBase::scheduleReconnection(const boost::optional<std::string>& 
assignedBrokerUrl) {
+void HandlerBase::scheduleReconnection() { scheduleReconnection(std::nullopt); 
}
+void HandlerBase::scheduleReconnection(const optional<std::string>& 
assignedBrokerUrl) {
     const auto state = state_.load();
 
     if (state == Pending || state == Ready) {
@@ -189,10 +188,9 @@ void HandlerBase::scheduleReconnection(const 
boost::optional<std::string>& assig
         // passing shared_ptr here since time_ will get destroyed, so tasks 
will be cancelled
         // so we will not run into the case where grabCnx is invoked on out of 
scope handler
         auto name = getName();
-        std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
+        auto weakSelf = weak_from_this();
         timer_->async_wait([name, weakSelf, assignedBrokerUrl](const 
ASIO_ERROR& ec) {
-            auto self = weakSelf.lock();
-            if (self) {
+            if (auto self = weakSelf.lock()) {
                 self->handleTimeout(ec, assignedBrokerUrl);
             } else {
                 LOG_WARN(name << "Cancel the reconnection since the handler is 
destroyed");
@@ -201,7 +199,7 @@ void HandlerBase::scheduleReconnection(const 
boost::optional<std::string>& assig
     }
 }
 
-void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const 
boost::optional<std::string>& assignedBrokerUrl) {
+void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const 
optional<std::string>& assignedBrokerUrl) {
     if (ec) {
         LOG_INFO(getName() << "Ignoring timer cancelled event, code[" << ec << 
"]");
     } else {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 967322f..acce15d 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -20,9 +20,9 @@
 #define _PULSAR_HANDLER_BASE_HEADER_
 #include <pulsar/Result.h>
 
-#include <boost/optional.hpp>
 #include <memory>
 #include <mutex>
+#include <optional>
 #include <string>
 
 #include "AsioTimer.h"
@@ -30,6 +30,8 @@
 #include "Future.h"
 #include "TimeUtils.h"
 
+using std::optional;
+
 namespace pulsar {
 
 class ClientImpl;
@@ -60,7 +62,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
      * tries reconnection and sets connection_ to valid object
      * @param assignedBrokerUrl assigned broker url to directly connect to 
without lookup
      */
-    void grabCnx(const boost::optional<std::string>& assignedBrokerUrl);
+    void grabCnx(const optional<std::string>& assignedBrokerUrl);
 
     /*
      * tries reconnection and sets connection_ to valid object
@@ -71,7 +73,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
      * Schedule reconnection after backoff time
      * @param assignedBrokerUrl assigned broker url to directly connect to 
without lookup
      */
-    void scheduleReconnection(const boost::optional<std::string>& 
assignedBrokerUrl);
+    void scheduleReconnection(const optional<std::string>& assignedBrokerUrl);
     /*
      * Schedule reconnection after backoff time
      */
@@ -108,11 +110,11 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     const std::shared_ptr<std::string> topic_;
 
     Future<Result, ClientConnectionPtr> getConnection(const ClientImplPtr& 
client,
-                                                      const 
boost::optional<std::string>& assignedBrokerUrl);
+                                                      const 
optional<std::string>& assignedBrokerUrl);
 
     void handleDisconnection(Result result, const ClientConnectionPtr& cnx);
 
-    void handleTimeout(const ASIO_ERROR& ec, const 
boost::optional<std::string>& assignedBrokerUrl);
+    void handleTimeout(const ASIO_ERROR& ec, const optional<std::string>& 
assignedBrokerUrl);
 
    protected:
     ClientImplWeakPtr client_;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 7d73403..6e0ba86 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -46,7 +46,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(const 
ClientImplPtr& client, co
                                                  const LookupServicePtr& 
lookupServicePtr,
                                                  const 
ConsumerInterceptorsPtr& interceptors,
                                                  Commands::SubscriptionMode 
subscriptionMode,
-                                                 const 
boost::optional<MessageId>& startMessageId)
+                                                 const optional<MessageId>& 
startMessageId)
     : MultiTopicsConsumerImpl(client, {topicName->toString()}, 
subscriptionName, topicName, conf,
                               lookupServicePtr, interceptors, 
subscriptionMode, startMessageId) {
     topicsPartitions_[topicName->toString()] = numPartitions;
@@ -56,7 +56,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(
     const ClientImplPtr& client, const std::vector<std::string>& topics, const 
std::string& subscriptionName,
     const TopicNamePtr& topicName, const ConsumerConfiguration& conf,
     const LookupServicePtr& lookupServicePtr, const ConsumerInterceptorsPtr& 
interceptors,
-    Commands::SubscriptionMode subscriptionMode, const 
boost::optional<MessageId>& startMessageId)
+    Commands::SubscriptionMode subscriptionMode, const optional<MessageId>& 
startMessageId)
     : ConsumerImplBase(client, topicName ? topicName->toString() : 
"EmptyTopics",
                        Backoff(milliseconds(100), seconds(60), 
milliseconds(0)), conf,
                        client->getListenerExecutorProvider()->get()),
@@ -448,7 +448,7 @@ void 
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(const ResultCallback& 
originalCallback) {
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+    auto weakSelf = weak_from_this();
     auto callback = [weakSelf, originalCallback](Result result) {
         auto self = weakSelf.lock();
         if (self) {
@@ -935,7 +935,7 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& 
msgId, const ResultCall
 
     beforeSeek();
     auto weakSelf = weak_from_this();
-    optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result 
result) {
+    optConsumer.value()->seekAsync(msgId, [this, weakSelf, callback](Result 
result) {
         auto self = weakSelf.lock();
         if (self) {
             afterSeek();
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index e92ec0e..b22227e 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -58,14 +58,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
                             const LookupServicePtr& lookupServicePtr,
                             const ConsumerInterceptorsPtr& interceptors,
                             Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
-                            const boost::optional<MessageId>& startMessageId = 
boost::none);
+                            const optional<MessageId>& startMessageId = 
optional<MessageId>{});
 
     MultiTopicsConsumerImpl(const ClientImplPtr& client, const 
std::vector<std::string>& topics,
                             const std::string& subscriptionName, const 
TopicNamePtr& topicName,
                             const ConsumerConfiguration& conf, const 
LookupServicePtr& lookupServicePtr_,
                             const ConsumerInterceptorsPtr& interceptors,
                             Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
-                            const boost::optional<MessageId>& startMessageId = 
boost::none);
+                            const optional<MessageId>& startMessageId = 
optional<MessageId>{});
 
     ~MultiTopicsConsumerImpl();
     // overrided methods from ConsumerImplBase
@@ -131,7 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     const std::vector<std::string> topics_;
     std::queue<ReceiveCallback> pendingReceives_;
     const Commands::SubscriptionMode subscriptionMode_;
-    boost::optional<MessageId> startMessageId_;
+    optional<MessageId> startMessageId_;
     ConsumerInterceptorsPtr interceptors_;
     std::atomic_bool duringSeek_{false};
 
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index b691b18..cdff617 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -55,7 +55,7 @@ void NegativeAcksTracker::scheduleTimer() {
     if (closed_) {
         return;
     }
-    std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     timer_->expires_after(timerInterval_);
     timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
         if (auto self = weakSelf.lock()) {
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index c68d23a..9cc6215 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -28,11 +28,10 @@ void PeriodicTask::start() {
     }
     state_ = Ready;
     if (periodMs_ >= 0) {
-        std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
+        auto weakSelf = weak_from_this();
         timer_->expires_after(std::chrono::milliseconds(periodMs_));
         timer_->async_wait([weakSelf](const ErrorCode& ec) {
-            auto self = weakSelf.lock();
-            if (self) {
+            if (auto self = weakSelf.lock()) {
                 self->handleTimeout(ec);
             }
         });
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 278871c..f2ac592 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -38,7 +38,7 @@ ProducerConfiguration& ProducerConfiguration::operator=(const 
ProducerConfigurat
 }
 
 ProducerConfiguration& ProducerConfiguration::setProducerName(const 
std::string& producerName) {
-    impl_->producerName = boost::make_optional(producerName);
+    impl_->producerName = std::make_optional(producerName);
     return *this;
 }
 
@@ -47,7 +47,7 @@ const std::string& ProducerConfiguration::getProducerName() 
const {
 }
 
 ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t 
initialSequenceId) {
-    impl_->initialSequenceId = boost::make_optional(initialSequenceId);
+    impl_->initialSequenceId = std::make_optional(initialSequenceId);
     return *this;
 }
 
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index c324020..1ca2ce4 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -21,15 +21,16 @@
 
 #include <pulsar/ProducerConfiguration.h>
 
-#include <boost/optional.hpp>
-#include <memory>
+#include <optional>
+
+using std::optional;
 
 namespace pulsar {
 
 struct ProducerConfigurationImpl {
     SchemaInfo schemaInfo;
-    boost::optional<std::string> producerName;
-    boost::optional<int64_t> initialSequenceId;
+    optional<std::string> producerName;
+    optional<int64_t> initialSequenceId;
     int sendTimeoutMs{30000};
     CompressionType compressionType{CompressionNone};
     int maxPendingMessages{1000};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 21c38c4..9d6a9a0 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -974,14 +974,14 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& 
metadata, SharedBuffer
                                encryptedPayload);
 }
 
-void ProducerImpl::disconnectProducer(const boost::optional<std::string>& 
assignedBrokerUrl) {
+void ProducerImpl::disconnectProducer(const optional<std::string>& 
assignedBrokerUrl) {
     LOG_INFO("Broker notification of Closed producer: "
-             << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + 
assignedBrokerUrl.get()) : ""));
+             << producerId_ << (assignedBrokerUrl ? (" assignedBrokerUrl: " + 
*assignedBrokerUrl) : ""));
     resetCnx();
     scheduleReconnection(assignedBrokerUrl);
 }
 
-void ProducerImpl::disconnectProducer() { disconnectProducer(boost::none); }
+void ProducerImpl::disconnectProducer() { disconnectProducer(std::nullopt); }
 
 void ProducerImpl::start() {
     HandlerBase::start();
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 77bd6d1..26207f8 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -26,7 +26,6 @@
 #include <boost/asio/steady_timer.hpp>
 #endif
 #include <atomic>
-#include <boost/optional.hpp>
 #include <list>
 #include <memory>
 
@@ -99,7 +98,7 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
 
     bool ackReceived(uint64_t sequenceId, MessageId& messageId);
 
-    virtual void disconnectProducer(const boost::optional<std::string>& 
assignedBrokerUrl);
+    virtual void disconnectProducer(const optional<std::string>& 
assignedBrokerUrl);
     virtual void disconnectProducer();
 
     uint64_t getProducerId() const;
@@ -209,7 +208,7 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
 
     MemoryLimitController& memoryLimitController_;
     const bool chunkingEnabled_;
-    boost::optional<uint64_t> topicEpoch;
+    optional<uint64_t> topicEpoch;
 
     ProducerInterceptorsPtr interceptors_;
 
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index f2d390d..fa4c8fc 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -80,7 +80,7 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
             operations_[key] = operation;
             lock.unlock();
 
-            std::weak_ptr<Self> weakSelf{this->shared_from_this()};
+            auto weakSelf = this->weak_from_this();
             future.addListener([this, weakSelf, key, operation](Result, const 
T&) {
                 auto self = weakSelf.lock();
                 if (!self) {
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index dacaf45..5f51cfd 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -19,14 +19,16 @@
 #pragma once
 
 #include <atomic>
-#include <boost/optional.hpp>
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <optional>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
+using std::optional;
+
 namespace pulsar {
 
 class SharedFuture {
@@ -46,7 +48,7 @@ class SynchronizedHashMap {
     using Lock = std::lock_guard<MutexType>;
 
    public:
-    using OptValue = boost::optional<V>;
+    using OptValue = optional<V>;
     using PairVector = std::vector<std::pair<K, V>>;
     using MapType = std::unordered_map<K, V>;
     using Iterator = typename MapType::iterator;
@@ -60,12 +62,12 @@ class SynchronizedHashMap {
     }
 
     // Put a new key-value pair if the key does not exist.
-    // Return boost::none if the key already exists or the existing value.
+    // Return an empty optional if the key already exists or the existing 
value.
     OptValue putIfAbsent(const K& key, const V& value) {
         Lock lock(mutex_);
         auto pair = data_.emplace(key, value);
         if (pair.second) {
-            return boost::none;
+            return {};
         } else {
             return pair.first->second;
         }
@@ -157,7 +159,7 @@ class SynchronizedHashMap {
         if (it != data_.end()) {
             return it->second;
         } else {
-            return boost::none;
+            return {};
         }
     }
 
@@ -168,18 +170,18 @@ class SynchronizedHashMap {
                 return kv.second;
             }
         }
-        return boost::none;
+        return {};
     }
 
     OptValue remove(const K& key) {
         Lock lock(mutex_);
         auto it = data_.find(key);
         if (it != data_.end()) {
-            auto result = boost::make_optional(std::move(it->second));
+            auto result = std::make_optional(std::move(it->second));
             data_.erase(it);
             return result;
         } else {
-            return boost::none;
+            return {};
         }
     }
 
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index e283a6f..f634fa5 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -70,7 +70,7 @@ bool TableViewImpl::getValue(const std::string& key, 
std::string& value) const {
     return false;
 }
 
-bool TableViewImpl::containsKey(const std::string& key) const { return 
data_.find(key) != boost::none; }
+bool TableViewImpl::containsKey(const std::string& key) const { return 
static_cast<bool>(data_.find(key)); }
 
 std::unordered_map<std::string, std::string> TableViewImpl::snapshot() { 
return data_.move(); }
 
@@ -120,7 +120,7 @@ void TableViewImpl::handleMessage(const Message& msg) {
 
 void TableViewImpl::readAllExistingMessages(const Promise<Result, 
TableViewImplPtr>& promise, long startTime,
                                             long messagesRead) {
-    std::weak_ptr<TableViewImpl> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     reader_->hasMessageAvailableAsync(
         [weakSelf, promise, startTime, messagesRead](Result result, bool 
hasMessage) {
             auto self = weakSelf.lock();
diff --git a/lib/UnAckedMessageTrackerEnabled.cc 
b/lib/UnAckedMessageTrackerEnabled.cc
index 3e9ce0e..e5bd3d2 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -39,10 +39,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
     ExecutorServicePtr executorService = 
client->getIOExecutorProvider()->get();
     timer_ = executorService->createDeadlineTimer();
     timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_));
-    std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
-        auto self = weakSelf.lock();
-        if (self && !ec) {
+        if (auto self = weakSelf.lock(); self && !ec) {
             self->timeoutHandler();
         }
     });
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 3dd1a73..ab4edd6 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -81,7 +81,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, 
CommandAck_AckType ackTy
 
 void ConsumerStatsImpl::scheduleTimer() {
     timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
-    std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (!self) {
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index a42532d..84bd1e2 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -110,7 +110,7 @@ ProducerStatsImpl::~ProducerStatsImpl() { 
cancelTimer(*timer_); }
 
 void ProducerStatsImpl::scheduleTimer() {
     timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
-    std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
+    auto weakSelf = weak_from_this();
     timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
         auto self = weakSelf.lock();
         if (!self) {
diff --git a/pkg/apk/build-apk.sh b/pkg/apk/build-apk.sh
index 68fdf89..4b80228 100755
--- a/pkg/apk/build-apk.sh
+++ b/pkg/apk/build-apk.sh
@@ -45,7 +45,7 @@ cp -r /root/packages/pkg ./build
 apk add --allow-untrusted build/$PLATFORM/*.apk
 
 cd $ROOT_DIR/win-examples
-g++ -o dynamic.out -std=c++11 ./example.cc -Wl,-rpath=/usr/lib -lpulsar
+g++ -o dynamic.out -std=c++17 ./example.cc -Wl,-rpath=/usr/lib -lpulsar
 ./dynamic.out
-g++ -o static.out -std=c++11 ./example.cc /usr/lib/libpulsarwithdeps.a 
-lpthread -ldl
+g++ -o static.out -std=c++17 ./example.cc /usr/lib/libpulsarwithdeps.a 
-lpthread -ldl
 ./static.out
diff --git a/pkg/mac/build-static-library.sh b/pkg/mac/build-static-library.sh
index 449222b..9190b86 100755
--- a/pkg/mac/build-static-library.sh
+++ b/pkg/mac/build-static-library.sh
@@ -70,7 +70,7 @@ cmake --build build-osx -j16 --target install
 cp ./build-osx/libpulsarwithdeps.a $INSTALL_DIR/lib/
 
 # Test the libraries
-clang++ win-examples/example.cc -o dynamic.out -std=c++11 -arch $ARCH -I 
$INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar
+clang++ win-examples/example.cc -o dynamic.out -std=c++17 -arch $ARCH -I 
$INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar
 ./dynamic.out
-clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I 
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
+clang++ win-examples/example.cc -o static.out -std=c++17 -arch $ARCH -I 
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
 ./static.out
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index 9bc1c52..f690fd4 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -20,7 +20,6 @@
 
 #include <algorithm>
 #include <atomic>
-#include <boost/optional/optional_io.hpp>
 #include <chrono>
 #include <thread>
 #include <vector>
@@ -101,8 +100,8 @@ TEST(SynchronizedHashMapTest, testForEach) {
     ASSERT_TRUE(values.empty());
     ASSERT_EQ(result, 1);
 
-    ASSERT_EQ(m.putIfAbsent(1, 100), boost::none);
-    ASSERT_EQ(m.putIfAbsent(1, 101), boost::optional<int>(100));
+    ASSERT_EQ(m.putIfAbsent(1, 100), optional<int>{});
+    ASSERT_EQ(m.putIfAbsent(1, 101), optional<int>(100));
     m.forEachValue([&values](int value, const SharedFuture&) { 
values.emplace_back(value); },
                    [&result] { result = 2; });
     ASSERT_EQ(values, (std::vector<int>({100})));
@@ -116,8 +115,8 @@ TEST(SynchronizedHashMapTest, testForEach) {
     ASSERT_EQ(result, 1);
 
     values.clear();
-    ASSERT_EQ(m.putIfAbsent(2, 200), boost::none);
-    ASSERT_EQ(m.putIfAbsent(2, 201), boost::optional<int>(200));
+    ASSERT_EQ(m.putIfAbsent(2, 200), optional<int>{});
+    ASSERT_EQ(m.putIfAbsent(2, 201), optional<int>(200));
     m.forEachValue([&values](int value, const SharedFuture&) { 
values.emplace_back(value); },
                    [&result] { result = 2; });
     std::sort(values.begin(), values.end());
diff --git a/version.txt b/version.txt
index 24a6729..8578fb2 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-3.8.0-pre
+4.0.0-pre

Reply via email to