This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f2f9f65 Adapt to latest Asio APIs (Asio 1.32 or Boost.Asio 1.88)
(#477)
f2f9f65 is described below
commit f2f9f65683123cfcdcefff0dde669969a476b347
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Oct 4 13:01:26 2025 +0800
Adapt to latest Asio APIs (Asio 1.32 or Boost.Asio 1.88) (#477)
---
.github/workflows/ci-pr-validation.yaml | 5 ++
CMakeLists.txt | 10 ++-
lib/AckGroupingTrackerEnabled.cc | 5 +-
lib/AsioTimer.h | 9 +++
lib/ClientConnection.cc | 118 ++++++++++++--------------------
lib/ClientConnection.h | 10 +--
lib/ConsumerImpl.cc | 9 ++-
lib/ConsumerImplBase.cc | 2 +-
lib/ExecutorService.cc | 37 +++++-----
lib/ExecutorService.h | 12 ++--
lib/HandlerBase.cc | 15 ++--
lib/JsonUtils.h | 7 --
lib/MultiTopicsConsumerImpl.cc | 7 +-
lib/Murmur3_32Hash.cc | 6 +-
lib/NegativeAcksTracker.cc | 5 +-
lib/PartitionedProducerImpl.cc | 5 +-
lib/PatternMultiTopicsConsumerImpl.cc | 9 +--
lib/PeriodicTask.cc | 7 +-
lib/ProducerImpl.cc | 11 ++-
lib/RetryableOperation.h | 5 +-
lib/SharedBuffer.h | 6 +-
lib/UnAckedMessageTrackerEnabled.cc | 5 +-
lib/checksum/crc32c_sse42.cc | 12 +---
lib/stats/ConsumerStatsImpl.cc | 4 +-
lib/stats/ConsumerStatsImpl.h | 5 +-
lib/stats/ProducerStatsImpl.cc | 4 +-
lib/stats/ProducerStatsImpl.h | 7 +-
tests/AuthPluginTest.cc | 2 +-
tests/ConsumerTest.h | 4 +-
vcpkg | 2 +-
vcpkg.json | 23 ++++---
31 files changed, 158 insertions(+), 210 deletions(-)
diff --git a/.github/workflows/ci-pr-validation.yaml
b/.github/workflows/ci-pr-validation.yaml
index 157cf25..4efebfc 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -142,6 +142,11 @@ jobs:
- name: Run unit tests
run: RETRY_FAILED=3 CMAKE_BUILD_DIRECTORY=./build ./run-unit-tests.sh
+ - name: Build with Boost.Asio
+ run: |
+ cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
+ cmake --build build-boost-asio -j8
+
- name: Build perf tools
run: |
cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
-DBUILD_PERF_TOOLS=ON
diff --git a/CMakeLists.txt b/CMakeLists.txt
index de9a245..792c2ef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,15 +19,19 @@
cmake_minimum_required(VERSION 3.13)
-option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
-
option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
if (INTEGRATE_VCPKG)
- set(USE_ASIO ON)
+ option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
if (NOT CMAKE_TOOLCHAIN_FILE)
set(CMAKE_TOOLCHAIN_FILE
"${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
endif ()
+ if (NOT USE_ASIO)
+ list(APPEND VCPKG_MANIFEST_FEATURES "boost-asio")
+ endif ()
+else ()
+ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
endif ()
+message(STATUS "USE_ASIO: ${USE_ASIO}")
option(BUILD_TESTS "Build tests" ON)
message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 72f46f2..d88426e 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -121,8 +121,7 @@ AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
this->flush();
std::lock_guard<std::mutex> lock(this->mutexTimer_);
if (this->timer_) {
- ASIO_ERROR ec;
- this->timer_->cancel(ec);
+ cancelTimer(*this->timer_);
}
}
@@ -172,7 +171,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L,
this->ackGroupingTimeMs_)));
+ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L,
this->ackGroupingTimeMs_)));
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
auto self = weakSelf.lock();
diff --git a/lib/AsioTimer.h b/lib/AsioTimer.h
index d0c3de5..56cd353 100644
--- a/lib/AsioTimer.h
+++ b/lib/AsioTimer.h
@@ -29,3 +29,12 @@
#include "AsioDefines.h"
using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
+
+inline void cancelTimer(ASIO::steady_timer& timer) {
+ try {
+ timer.cancel();
+ } catch (const ASIO_SYSTEM_ERROR& ignored) {
+ // Most of the time the exception can be ignored unless the following
logic depends on the fact that
+ // the timer is cancelled.
+ }
+}
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 9c7537a..49b2cbc 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -41,6 +41,14 @@
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"
+#ifdef USE_ASIO
+#include <asio/connect.hpp>
+#include <asio/ssl/host_name_verification.hpp>
+#else
+#include <boost/asio/connect.hpp>
+#include <boost/asio/ssl/host_name_verification.hpp>
+#endif
+
DECLARE_LOG_OBJECT()
using namespace ASIO::ip;
@@ -170,13 +178,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
-#if defined(USE_ASIO) || BOOST_VERSION >= 107000
strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
-#elif BOOST_VERSION >= 106600
- strand_(executor_->getIOService().get_executor()),
-#else
- strand_(executor_->getIOService()),
-#endif
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -266,7 +268,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
if (!clientConfiguration.isTlsAllowInsecureConnection() &&
clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":"
<< serviceUrl.port());
std::string urlHost = isSniProxy_ ? proxyUrl.host() :
serviceUrl.host();
-
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
+
tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
}
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
@@ -309,7 +311,7 @@ void ClientConnection::handlePulsarConnected(const
proto::CommandConnected& cmdC
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
if (keepAliveTimer_) {
-
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
+
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
@@ -354,7 +356,7 @@ void
ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
// If the close operation has reset the consumerStatsRequestTimer_ then
the use_count will be zero
// Check if we have a timer still before we set the request timer to pop
again.
if (consumerStatsRequestTimer_) {
- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+ consumerStatsRequestTimer_->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
consumerStatsRequestTimer_->async_wait([weakSelf,
consumerStatsRequests](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
@@ -394,7 +396,7 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP,
TCP_KEEPIDLE> tcp_keep
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this
class
*/
-void ClientConnection::handleTcpConnected(const ASIO_ERROR& err,
tcp::resolver::iterator endpointIterator) {
+void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const
tcp::endpoint& endpoint) {
if (!err) {
std::stringstream cnxStringStream;
try {
@@ -479,38 +481,13 @@ void ClientConnection::handleTcpConnected(const
ASIO_ERROR& err, tcp::resolver::
} else {
handleHandshake(ASIO_SUCCESS);
}
- } else if (endpointIterator != tcp::resolver::iterator()) {
- LOG_WARN(cnxString_ << "Failed to establish connection: " <<
err.message());
- // The connection failed. Try the next endpoint in the list.
- ASIO_ERROR closeError;
- socket_->close(closeError); // ignore the error of close
- if (closeError) {
- LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
- }
- connectTimeoutTask_->stop();
- ++endpointIterator;
- if (endpointIterator != tcp::resolver::iterator()) {
- LOG_DEBUG(cnxString_ << "Connecting to " <<
endpointIterator->endpoint() << "...");
- connectTimeoutTask_->start();
- tcp::endpoint endpoint = *endpointIterator;
- auto weakSelf = weak_from_this();
- socket_->async_connect(endpoint, [weakSelf,
endpointIterator](const ASIO_ERROR& err) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleTcpConnected(err, endpointIterator);
- }
- });
- } else {
- if (err == ASIO::error::operation_aborted) {
- // TCP connect timeout, which is not retryable
- close();
- } else {
- close(ResultRetryable);
- }
- }
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " <<
err.message());
- close(ResultRetryable);
+ if (err == ASIO::error::operation_aborted) {
+ close();
+ } else {
+ close(ResultRetryable);
+ }
}
}
@@ -603,18 +580,18 @@ void ClientConnection::tcpConnectAsync() {
}
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" <<
service_url.port());
- tcp::resolver::query query(service_url.host(),
std::to_string(service_url.port()));
+
auto weakSelf = weak_from_this();
- resolver_->async_resolve(query,
- [weakSelf](const ASIO_ERROR& err, const
tcp::resolver::iterator& iterator) {
+ resolver_->async_resolve(service_url.host(),
std::to_string(service_url.port()),
+ [weakSelf](const ASIO_ERROR& err,
tcp::resolver::results_type results) {
auto self = weakSelf.lock();
if (self) {
- self->handleResolve(err, iterator);
+ self->handleResolve(err, results);
}
});
}
-void ClientConnection::handleResolve(const ASIO_ERROR& err, const
tcp::resolver::iterator& endpointIterator) {
+void ClientConnection::handleResolve(ASIO_ERROR err, const
tcp::resolver::results_type& results) {
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " <<
err.message());
@@ -641,23 +618,13 @@ void ClientConnection::handleResolve(const ASIO_ERROR&
err, const tcp::resolver:
}
ptr->connectTimeoutTask_->stop();
});
-
- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint()
<< "...");
connectTimeoutTask_->start();
- if (endpointIterator != tcp::resolver::iterator()) {
- LOG_DEBUG(cnxString_ << "Resolved hostname " <<
endpointIterator->host_name() //
- << " to " << endpointIterator->endpoint());
- socket_->async_connect(*endpointIterator, [weakSelf,
endpointIterator](const ASIO_ERROR& err) {
- auto self = weakSelf.lock();
- if (self) {
- self->handleTcpConnected(err, endpointIterator);
- }
- });
- } else {
- LOG_WARN(cnxString_ << "No IP address found");
- close();
- return;
- }
+ ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err,
const tcp::endpoint& endpoint) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleTcpConnected(err, endpoint);
+ }
+ });
}
void ClientConnection::readNextCommand() {
@@ -1061,7 +1028,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd,
uint64_t requestId,
LookupRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
- requestData.timer->expires_from_now(operationsTimeout_);
+ requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR&
ec) {
auto self = weakSelf.lock();
@@ -1201,7 +1168,7 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(const SharedBuf
PendingRequestData requestData;
requestData.timer = executor_->createDeadlineTimer();
- requestData.timer->expires_from_now(operationsTimeout_);
+ requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR&
ec) {
auto self = weakSelf.lock();
@@ -1256,7 +1223,7 @@ void ClientConnection::handleKeepAliveTimeout() {
// be zero And we do not attempt to dereference the pointer.
Lock lock(mutex_);
if (keepAliveTimer_) {
-
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
+
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
@@ -1318,12 +1285,12 @@ void ClientConnection::close(Result result, bool
detach) {
numOfPendingLookupRequest_ = 0;
if (keepAliveTimer_) {
- keepAliveTimer_->cancel();
+ cancelTimer(*keepAliveTimer_);
keepAliveTimer_.reset();
}
if (consumerStatsRequestTimer_) {
- consumerStatsRequestTimer_->cancel();
+ cancelTimer(*consumerStatsRequestTimer_);
consumerStatsRequestTimer_.reset();
}
@@ -1435,7 +1402,7 @@ Future<Result, GetLastMessageIdResponse>
ClientConnection::newGetLastMessageId(u
LastMessageIdRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
- requestData.timer->expires_from_now(operationsTimeout_);
+ requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR&
ec) {
auto self = weakSelf.lock();
@@ -1483,7 +1450,7 @@ Future<Result, SchemaInfo>
ClientConnection::newGetSchema(const std::string& top
lock.unlock();
auto weakSelf = weak_from_this();
- timer->expires_from_now(operationsTimeout_);
+ timer->expires_after(operationsTimeout_);
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
@@ -1570,7 +1537,7 @@ void ClientConnection::handleSuccess(const
proto::CommandSuccess& success) {
lock.unlock();
requestData.promise.setValue({});
- requestData.timer->cancel();
+ cancelTimer(*requestData.timer);
}
}
@@ -1582,7 +1549,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
Lock lock(mutex_);
auto it =
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
- it->second.timer->cancel();
+ cancelTimer(*it->second.timer);
+
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
@@ -1661,7 +1629,7 @@ void ClientConnection::handleLookupTopicRespose(
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
- it->second.timer->cancel();
+ cancelTimer(*it->second.timer);
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
@@ -1739,7 +1707,7 @@ void ClientConnection::handleProducerSuccess(const
proto::CommandProducerSuccess
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
- requestData.timer->cancel();
+ cancelTimer(*requestData.timer);
}
}
}
@@ -1759,7 +1727,7 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
lock.unlock();
requestData.promise.setFailed(result);
- requestData.timer->cancel();
+ cancelTimer(*requestData.timer);
} else {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
@@ -2052,8 +2020,8 @@ void ClientConnection::unsafeRemovePendingRequest(long
requestId) {
auto it = pendingRequests_.find(requestId);
if (it != pendingRequests_.end()) {
it->second.promise.setFailed(ResultDisconnected);
- ASIO_ERROR ec;
- it->second.timer->cancel(ec);
+ cancelTimer(*it->second.timer);
+
pendingRequests_.erase(it);
}
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index ff40281..cf6be65 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -26,13 +26,13 @@
#include <cstdint>
#ifdef USE_ASIO
#include <asio/bind_executor.hpp>
-#include <asio/io_service.hpp>
+#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ssl/stream.hpp>
#include <asio/strand.hpp>
#else
#include <boost/asio/bind_executor.hpp>
-#include <boost/asio/io_service.hpp>
+#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
@@ -238,7 +238,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
* although not usable at this point, since this is just tcp connection
* Pulsar - Connect/Connected has yet to happen
*/
- void handleTcpConnected(const ASIO_ERROR& err,
ASIO::ip::tcp::resolver::iterator endpointIterator);
+ void handleTcpConnected(const ASIO_ERROR& err, const
ASIO::ip::tcp::endpoint& endpoint);
void handleHandshake(const ASIO_ERROR& err);
@@ -261,7 +261,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
- void handleResolve(const ASIO_ERROR& err, const
ASIO::ip::tcp::resolver::iterator& endpointIterator);
+ void handleResolve(ASIO_ERROR err, const
ASIO::ip::tcp::resolver::results_type& results);
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
void handleSendPair(const ASIO_ERROR& err);
@@ -325,7 +325,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
- ASIO::strand<ASIO::io_service::executor_type> strand_;
+ ASIO::strand<ASIO::io_context::executor_type> strand_;
const std::string logicalAddress_;
/*
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 401d0a1..325adda 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -423,7 +423,7 @@ void ConsumerImpl::discardChunkMessages(const std::string&
uuid, const MessageId
}
void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
-
checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+
checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR&
ec) -> void {
auto self = weakSelf.lock();
@@ -1690,7 +1690,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const
BackoffPtr& backoff, Time
}
remainTime -= next;
- timer->expires_from_now(next);
+ timer->expires_after(next);
auto self = shared_from_this();
timer->async_wait([this, backoff, remainTime, timer, next, callback,
@@ -1814,9 +1814,8 @@ std::shared_ptr<ConsumerImpl>
ConsumerImpl::get_shared_this_ptr() {
}
void ConsumerImpl::cancelTimers() noexcept {
- ASIO_ERROR ec;
- batchReceiveTimer_->cancel(ec);
- checkExpiredChunkedTimer_->cancel(ec);
+ cancelTimer(*batchReceiveTimer_);
+ cancelTimer(*checkExpiredChunkedTimer_);
unAckedMessageTrackerPtr_->stop();
consumerStatsBasePtr_->stop();
}
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
index 2963a67..171256d 100644
--- a/lib/ConsumerImplBase.cc
+++ b/lib/ConsumerImplBase.cc
@@ -49,7 +49,7 @@ ConsumerImplBase::ConsumerImplBase(const ClientImplPtr&
client, const std::strin
void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
if (timeoutMs > 0) {
-
batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
+
batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index 794e361..99e2393 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -31,17 +31,16 @@ ExecutorService::~ExecutorService() { close(0); }
void ExecutorService::start() {
auto self = shared_from_this();
std::thread t{[this, self] {
- LOG_DEBUG("Run io_service in a single thread");
- ASIO_ERROR ec;
+ LOG_DEBUG("Run io_context in a single thread");
while (!closed_) {
- io_service_.restart();
- IOService::work work{getIOService()};
- io_service_.run(ec);
- }
- if (ec) {
- LOG_ERROR("Failed to run io_service: " << ec.message());
- } else {
- LOG_DEBUG("Event loop of ExecutorService exits successfully");
+ io_context_.restart();
+ auto work_guard = ASIO::make_work_guard(io_context_);
+ try {
+ io_context_.run();
+ LOG_DEBUG("Event loop of ExecutorService exits successfully");
+ } catch (const ASIO_SYSTEM_ERROR &e) {
+ LOG_ERROR("Failed to run io_context: " << e.what());
+ }
}
{
std::lock_guard<std::mutex> lock{mutex_};
@@ -63,12 +62,12 @@ ExecutorServicePtr ExecutorService::create() {
}
/*
- * factory method of ASIO::ip::tcp::socket associated with io_service_
instance
+ * factory method of ASIO::ip::tcp::socket associated with io_context_
instance
* @ returns shared_ptr to this socket
*/
SocketPtr ExecutorService::createSocket() {
try {
- return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
+ return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
} catch (const ASIO_SYSTEM_ERROR &e) {
restart();
auto error = std::string("Failed to create socket: ") + e.what();
@@ -82,12 +81,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr
&socket, ASIO::ssl::cont
}
/*
- * factory method of Resolver object associated with io_service_ instance
+ * factory method of Resolver object associated with io_context_ instance
* @returns shraed_ptr to resolver object
*/
TcpResolverPtr ExecutorService::createTcpResolver() {
try {
- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
+ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
} catch (const ASIO_SYSTEM_ERROR &e) {
restart();
auto error = std::string("Failed to create resolver: ") + e.what();
@@ -97,7 +96,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
try {
- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
+ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
} catch (const ASIO_SYSTEM_ERROR &e) {
restart();
auto error = std::string("Failed to create steady_timer: ") + e.what();
@@ -105,7 +104,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
}
}
-void ExecutorService::restart() { io_service_.stop(); }
+void ExecutorService::restart() { io_context_.stop(); }
void ExecutorService::close(long timeoutMs) {
bool expectedState = false;
@@ -113,12 +112,12 @@ void ExecutorService::close(long timeoutMs) {
return;
}
if (timeoutMs == 0) { // non-blocking
- io_service_.stop();
+ io_context_.stop();
return;
}
std::unique_lock<std::mutex> lock{mutex_};
- io_service_.stop();
+ io_context_.stop();
if (timeoutMs > 0) {
cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] {
return ioServiceDone_; });
} else { // < 0
@@ -126,7 +125,7 @@ void ExecutorService::close(long timeoutMs) {
}
}
-void ExecutorService::postWork(std::function<void(void)> task) {
io_service_.post(task); }
+void ExecutorService::postWork(std::function<void(void)> task) {
ASIO::post(io_context_, std::move(task)); }
/////////////////////
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index 89d06d3..626cb20 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -23,11 +23,11 @@
#include <atomic>
#ifdef USE_ASIO
-#include <asio/io_service.hpp>
+#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ssl.hpp>
#else
-#include <boost/asio/io_service.hpp>
+#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#endif
@@ -46,7 +46,7 @@ typedef
std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
class PULSAR_PUBLIC ExecutorService : public
std::enable_shared_from_this<ExecutorService> {
public:
- using IOService = ASIO::io_service;
+ using IOService = ASIO::io_context;
using SharedPtr = std::shared_ptr<ExecutorService>;
static SharedPtr create();
@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public
std::enable_shared_from_this<Execut
// See TimeoutProcessor for the semantics of the parameter.
void close(long timeoutMs = 3000);
- IOService &getIOService() { return io_service_; }
+ IOService &getIOService() { return io_context_; }
bool isClosed() const noexcept { return closed_; }
private:
/*
- * io_service is our interface to os, io object schedule async ops on this
object
+ * io_context is our interface to os, io object schedule async ops on this
object
*/
- IOService io_service_;
+ IOService io_context_;
std::atomic_bool closed_{false};
std::mutex mutex_;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 65aa0db..ffc4e2c 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const
std::string& topic,
redirectedClusterURI_("") {}
HandlerBase::~HandlerBase() {
- ASIO_ERROR ignored;
- timer_->cancel(ignored);
- creationTimer_->cancel(ignored);
+ cancelTimer(*timer_);
+ cancelTimer(*creationTimer_);
}
void HandlerBase::start() {
@@ -61,15 +60,14 @@ void HandlerBase::start() {
if (state_.compare_exchange_strong(state, Pending)) {
grabCnx();
}
- creationTimer_->expires_from_now(operationTimeut_);
+ creationTimer_->expires_after(operationTimeut_);
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
auto self = weakSelf.lock();
if (self && !error) {
LOG_WARN("Cancel the pending reconnection due to the start
timeout");
connectionFailed(ResultTimeout);
- ASIO_ERROR ignored;
- timer_->cancel(ignored);
+ cancelTimer(*timer_);
}
});
}
@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const
boost::optional<std::string>& assignedBrokerUrl)
connectionTimeMs_ =
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
// Prevent the creationTimer_ from cancelling the timer_
in future
- ASIO_ERROR ignored;
- creationTimer_->cancel(ignored);
+ cancelTimer(*creationTimer_);
LOG_INFO("Finished connecting to broker after " <<
connectionTimeMs_ << " ms")
} else if (isResultRetryable(result)) {
scheduleReconnection();
@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const
boost::optional<std::string>& assig
TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0)
: backoff_.next();
LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay)
/ 1000.0) << " s");
- timer_->expires_from_now(delay);
+ timer_->expires_after(delay);
// passing shared_ptr here since time_ will get destroyed, so tasks
will be cancelled
// so we will not run into the case where grabCnx is invoked on out of
scope handler
auto name = getName();
diff --git a/lib/JsonUtils.h b/lib/JsonUtils.h
index 841eb5e..2bd4469 100644
--- a/lib/JsonUtils.h
+++ b/lib/JsonUtils.h
@@ -28,14 +28,7 @@ template <typename Ptree>
inline std::string toJson(const Ptree& pt) {
std::ostringstream oss;
boost::property_tree::write_json(oss, pt, false);
- // For Boost < 1.86, boost::property_tree will write a endline at the end
-#if BOOST_VERSION < 108600
- auto s = oss.str();
- s.pop_back();
- return s;
-#else
return oss.str();
-#endif
}
} // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 3ce8e3c..eb3546d 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -507,7 +507,7 @@ void MultiTopicsConsumerImpl::closeAsync(const
ResultCallback& originalCallback)
failPendingBatchReceiveCallback();
// cancel timer
- batchReceiveTimer_->cancel();
+ cancelTimer(*batchReceiveTimer_);
}
void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const
Message& msg) {
@@ -973,7 +973,7 @@ uint64_t
MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
return numberOfConnectedConsumer;
}
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
auto weakSelf = weak_from_this();
partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
// If two requests call runPartitionUpdateTask at the same time, the
timer will fail, and it
@@ -1126,8 +1126,7 @@ void
MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
void MultiTopicsConsumerImpl::cancelTimers() noexcept {
if (partitionsUpdateTimer_) {
- ASIO_ERROR ec;
- partitionsUpdateTimer_->cancel(ec);
+ cancelTimer(*partitionsUpdateTimer_);
}
}
diff --git a/lib/Murmur3_32Hash.cc b/lib/Murmur3_32Hash.cc
index 1b214dd..45a4988 100644
--- a/lib/Murmur3_32Hash.cc
+++ b/lib/Murmur3_32Hash.cc
@@ -23,12 +23,8 @@
// the orignal MurmurHash3 source code.
#include "Murmur3_32Hash.h"
-#include <boost/version.hpp>
-#if BOOST_VERSION >= 105500
#include <boost/predef.h>
-#else
-#include <boost/detail/endian.hpp>
-#endif
+
#include <limits>
#if BOOST_COMP_MSVC
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index ba008a8..e5f439d 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -56,7 +56,7 @@ void NegativeAcksTracker::scheduleTimer() {
return;
}
std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
- timer_->expires_from_now(timerInterval_);
+ timer_->expires_after(timerInterval_);
timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
if (auto self = weakSelf.lock()) {
self->handleTimer(ec);
@@ -135,8 +135,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
void NegativeAcksTracker::close() {
closed_ = true;
- ASIO_ERROR ec;
- timer_->cancel(ec);
+ cancelTimer(*timer_);
std::lock_guard<std::mutex> lock(mutex_);
nackedMessages_.clear();
}
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 9b2f5c6..4a92366 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback
callback) {
void PartitionedProducerImpl::runPartitionUpdateTask() {
auto weakSelf = weak_from_this();
- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (self) {
@@ -524,8 +524,7 @@ uint64_t
PartitionedProducerImpl::getNumberOfConnectedProducer() {
void PartitionedProducerImpl::cancelTimers() noexcept {
if (partitionsUpdateTimer_) {
- ASIO_ERROR ec;
- partitionsUpdateTimer_->cancel(ec);
+ cancelTimer(*partitionsUpdateTimer_);
}
}
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc
b/lib/PatternMultiTopicsConsumerImpl.cc
index 9f3fbb9..fd48fee 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex
PatternMultiTopicsConsumerImpl::getPattern()
void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
autoDiscoveryRunning_ = false;
-
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+
autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
auto weakSelf = weak_from_this();
autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
@@ -232,7 +232,7 @@ void PatternMultiTopicsConsumerImpl::start() {
LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
-
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+
autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
auto weakSelf = weak_from_this();
autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
if (auto self = weakSelf.lock()) {
@@ -252,7 +252,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(const
ResultCallback& callback)
MultiTopicsConsumerImpl::closeAsync(callback);
}
-void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
- ASIO_ERROR ec;
- autoDiscoveryTimer_->cancel(ec);
-}
+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
cancelTimer(*autoDiscoveryTimer_); }
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 9fde012..c68d23a 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -29,7 +29,7 @@ void PeriodicTask::start() {
state_ = Ready;
if (periodMs_ >= 0) {
std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
+ timer_->expires_after(std::chrono::milliseconds(periodMs_));
timer_->async_wait([weakSelf](const ErrorCode& ec) {
auto self = weakSelf.lock();
if (self) {
@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept {
if (!state_.compare_exchange_strong(state, Closing)) {
return;
}
- ErrorCode ec;
- timer_->cancel(ec);
+ cancelTimer(*timer_);
state_ = Pending;
}
@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
// state_ may be changed in handleTimeout, so we check state_ again
if (state_ == Ready) {
auto self = shared_from_this();
- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
+ timer_->expires_after(std::chrono::milliseconds(periodMs_));
timer_->async_wait([this, self](const ErrorCode& ec) {
handleTimeout(ec); });
}
}
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index cc8f320..21c38c4 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, SendCallback&& c
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
bool isFull = batchMessageContainer_->add(msg, callback);
if (isFirstMessage) {
-
batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
+
batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
auto weakSelf = weak_from_this();
batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
@@ -699,7 +699,7 @@ void ProducerImpl::releaseSemaphoreForSendOp(const
OpSendMsg& op) {
PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback&
flushCallback) {
PendingFailures failures;
LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
- batchTimer_->cancel();
+ cancelTimer(*batchTimer_);
if (batchMessageContainer_->isEmpty()) {
return failures;
}
@@ -1009,9 +1009,8 @@ void ProducerImpl::internalShutdown() {
void ProducerImpl::cancelTimers() noexcept {
dataKeyRefreshTask_.stop();
- ASIO_ERROR ec;
- batchTimer_->cancel(ec);
- sendTimer_->cancel(ec);
+ cancelTimer(*batchTimer_);
+ cancelTimer(*sendTimer_);
}
bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const
ProducerImplPtr& b) const {
@@ -1032,7 +1031,7 @@ void ProducerImpl::startSendTimeoutTimer() {
}
void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
- sendTimer_->expires_from_now(expiryTime);
+ sendTimer_->expires_after(expiryTime);
auto weakSelf = weak_from_this();
sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index dba190f..f4b056e 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -68,8 +68,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
void cancel() {
promise_.setFailed(ResultDisconnected);
- ASIO_ERROR ec;
- timer_->cancel(ec);
+ cancelTimer(*timer_);
}
private:
@@ -107,7 +106,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
}
auto delay = std::min(backoff_.next(), remainingTime);
- timer_->expires_from_now(delay);
+ timer_->expires_after(delay);
auto nextRemainingTime = remainingTime - delay;
LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay)
diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h
index 033802f..c8779dc 100644
--- a/lib/SharedBuffer.h
+++ b/lib/SharedBuffer.h
@@ -151,11 +151,11 @@ class SharedBuffer {
inline bool writable() const { return writableBytes() > 0; }
- ASIO::const_buffers_1 const_asio_buffer() const {
- return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes());
+ ASIO::const_buffer const_asio_buffer() const {
+ return ASIO::const_buffer(ptr_ + readIdx_, readableBytes());
}
- ASIO::mutable_buffers_1 asio_buffer() {
+ ASIO::mutable_buffer asio_buffer() {
assert(data_);
return ASIO::buffer(ptr_ + writeIdx_, writableBytes());
}
diff --git a/lib/UnAckedMessageTrackerEnabled.cc
b/lib/UnAckedMessageTrackerEnabled.cc
index b61c4c9..3e9ce0e 100644
--- a/lib/UnAckedMessageTrackerEnabled.cc
+++ b/lib/UnAckedMessageTrackerEnabled.cc
@@ -38,7 +38,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}
ExecutorServicePtr executorService =
client->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
- timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_));
+ timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_));
std::weak_ptr<UnAckedMessageTrackerEnabled> weakSelf{shared_from_this()};
timer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
@@ -177,9 +177,8 @@ void UnAckedMessageTrackerEnabled::clear() {
}
void UnAckedMessageTrackerEnabled::stop() {
- ASIO_ERROR ec;
if (timer_) {
- timer_->cancel(ec);
+ cancelTimer(*timer_);
}
}
} /* namespace pulsar */
diff --git a/lib/checksum/crc32c_sse42.cc b/lib/checksum/crc32c_sse42.cc
index 8c52a27..8a24aeb 100644
--- a/lib/checksum/crc32c_sse42.cc
+++ b/lib/checksum/crc32c_sse42.cc
@@ -15,18 +15,8 @@
******************************************************************************/
#include "crc32c_sse42.h"
-#include <boost/version.hpp>
-#if BOOST_VERSION >= 105500
-#include <boost/predef.h>
-#else
-#if _MSC_VER
-#pragma message("Boost version is < 1.55, disable CRC32C")
-#else
-#warning "Boost version is < 1.55, disable CRC32C"
-#endif
-#endif
-
#include <assert.h>
+#include <boost/predef.h>
#include <stdlib.h>
#include "gf2.hpp"
diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc
index 74c7117..3dd1a73 100644
--- a/lib/stats/ConsumerStatsImpl.cc
+++ b/lib/stats/ConsumerStatsImpl.cc
@@ -59,7 +59,7 @@ void ConsumerStatsImpl::flushAndReset(const ASIO_ERROR& ec) {
LOG_INFO(oss.str());
}
-ConsumerStatsImpl::~ConsumerStatsImpl() { timer_->cancel(); }
+ConsumerStatsImpl::~ConsumerStatsImpl() { cancelTimer(*timer_); }
void ConsumerStatsImpl::start() { scheduleTimer(); }
@@ -80,7 +80,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res,
CommandAck_AckType ackTy
}
void ConsumerStatsImpl::scheduleTimer() {
- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
+ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h
index 0ce0ef1..726fc7e 100644
--- a/lib/stats/ConsumerStatsImpl.h
+++ b/lib/stats/ConsumerStatsImpl.h
@@ -55,10 +55,7 @@ class ConsumerStatsImpl : public
std::enable_shared_from_this<ConsumerStatsImpl>
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
void flushAndReset(const ASIO_ERROR&);
void start() override;
- void stop() override {
- ASIO_ERROR error;
- timer_->cancel(error);
- }
+ void stop() override { cancelTimer(*timer_); }
void receivedMessage(Message&, Result) override;
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums)
override;
virtual ~ConsumerStatsImpl();
diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc
index 8aae280..a42532d 100644
--- a/lib/stats/ProducerStatsImpl.cc
+++ b/lib/stats/ProducerStatsImpl.cc
@@ -106,10 +106,10 @@ void ProducerStatsImpl::messageReceived(Result res, const
ptime& publishTime) {
totalSendMap_[res] += 1; // Value will automatically be initialized to 0
in the constructor
}
-ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
+ProducerStatsImpl::~ProducerStatsImpl() { cancelTimer(*timer_); }
void ProducerStatsImpl::scheduleTimer() {
- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_));
+ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_));
std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h
index 495a89c..e189c5f 100644
--- a/lib/stats/ProducerStatsImpl.h
+++ b/lib/stats/ProducerStatsImpl.h
@@ -20,17 +20,14 @@
#ifndef PULSAR_PRODUCER_STATS_IMPL_HEADER
#define PULSAR_PRODUCER_STATS_IMPL_HEADER
-#include <map>
-
-#if BOOST_VERSION >= 106400
-#include <boost/serialization/array_wrapper.hpp>
-#endif
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/framework/accumulator_set.hpp>
#include <boost/accumulators/framework/features.hpp>
#include <boost/accumulators/statistics.hpp>
#include <boost/accumulators/statistics/extended_p_square.hpp>
+#include <boost/serialization/array_wrapper.hpp>
#include <iostream>
+#include <map>
#include <memory>
#include <mutex>
#include <vector>
diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc
index e48dbfb..6c6b898 100644
--- a/tests/AuthPluginTest.cc
+++ b/tests/AuthPluginTest.cc
@@ -370,7 +370,7 @@ class SocketStream {
void mockZTS(Latch& latch, int port) {
LOG_INFO("-- MockZTS started");
- ASIO::io_service io;
+ ASIO::io_context io;
ASIO::ip::tcp::acceptor acceptor(io,
ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port));
LOG_INFO("-- MockZTS waiting for connnection");
diff --git a/tests/ConsumerTest.h b/tests/ConsumerTest.h
index 8248287..9d190c1 100644
--- a/tests/ConsumerTest.h
+++ b/tests/ConsumerTest.h
@@ -46,8 +46,8 @@ class ConsumerTest {
return nullptr;
}
auto timer = cnx->executor_->createDeadlineTimer();
- timer->expires_from_now(delaySinceStartGrabCnx -
-
std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
+ timer->expires_after(delaySinceStartGrabCnx -
+ std::chrono::milliseconds(impl->connectionTimeMs_
+ 50));
timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
return timer;
}
diff --git a/vcpkg b/vcpkg
index d6995a0..0d9d468 160000
--- a/vcpkg
+++ b/vcpkg
@@ -1 +1 @@
-Subproject commit d6995a0cf3cafda5e9e52749fad075dd62bfd90c
+Subproject commit 0d9d4684352ba8de70bdf251c6fc9a3c464fa12b
diff --git a/vcpkg.json b/vcpkg.json
index 5ec0e0e..83c926a 100644
--- a/vcpkg.json
+++ b/vcpkg.json
@@ -9,19 +9,19 @@
"features": [
"openssl"
],
- "version>=": "1.28.2"
+ "version>=": "1.32.0"
},
{
"name": "boost-accumulators",
- "version>=": "1.83.0"
+ "version>=": "1.88.0"
},
{
"name": "boost-format",
- "version>=": "1.83.0"
+ "version>=": "1.88.0"
},
{
"name": "boost-property-tree",
- "version>=": "1.83.0"
+ "version>=": "1.88.0"
},
{
"name": "curl",
@@ -61,12 +61,21 @@
}
],
"features": {
+ "boost-asio": {
+ "description": "Use Boost.Asio instead of standalone Asio",
+ "dependencies": [
+ {
+ "name": "boost-asio",
+ "version>=": "1.88.0"
+ }
+ ]
+ },
"perf": {
"description": "Build Performance Tool",
"dependencies": [
{
"name": "boost-program-options",
- "version>=": "1.83.0"
+ "version>=": "1.88.0"
}
]
},
@@ -81,10 +90,6 @@
}
},
"overrides": [
- {
- "name": "asio",
- "version": "1.28.2"
- },
{
"name": "protobuf",
"version": "3.21.12"