This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 9b86f9e [improve] modify the negativeACK structure to reduce memory
overhead (#497)
9b86f9e is described below
commit 9b86f9e2b037015cc437abc2f5f8b9475846bf95
Author: Guangyang Deng <[email protected]>
AuthorDate: Mon Aug 4 14:07:54 2025 +0800
[improve] modify the negativeACK structure to reduce memory overhead (#497)
---
.github/workflows/ci-pr-validation.yaml | 10 +++----
.github/workflows/codeql-analysis.yml | 2 +-
CMakeLists.txt | 2 ++
LegacyFindPackages.cmake | 22 +++++++++++++++
dependencies.yaml | 1 +
include/pulsar/ConsumerConfiguration.h | 15 +++++++++++
lib/ConsumerConfiguration.cc | 10 +++++++
lib/ConsumerConfigurationImpl.h | 1 +
lib/NegativeAcksTracker.cc | 40 ++++++++++++++++++++++-----
lib/NegativeAcksTracker.h | 11 +++++++-
lib/c/c_ConsumerConfiguration.cc | 10 +++++++
pkg/apk/Dockerfile | 10 +++++++
pkg/deb/Dockerfile | 14 ++++++++--
pkg/mac/build-static-library.sh | 2 +-
tests/BasicEndToEndTest.cc | 48 +++++++++++++++++++++++++++++++++
tests/ConsumerConfigurationTest.cc | 4 +++
vcpkg.json | 4 +++
17 files changed, 190 insertions(+), 16 deletions(-)
diff --git a/.github/workflows/ci-pr-validation.yaml
b/.github/workflows/ci-pr-validation.yaml
index c514009..157cf25 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -167,7 +167,7 @@ jobs:
sudo apt-get install -y libcurl4-openssl-dev libssl-dev \
protobuf-compiler libprotobuf-dev libboost-dev \
libboost-dev libboost-program-options-dev \
- libzstd-dev libsnappy-dev libgmock-dev libgtest-dev
+ libzstd-dev libsnappy-dev libgmock-dev libgtest-dev libroaring-dev
- name: CMake
run: cmake -B build -DBUILD_PERF_TOOLS=ON -DCMAKE_CXX_STANDARD=20
- name: Build
@@ -188,16 +188,16 @@ jobs:
matrix:
include:
- name: 'Windows x64'
- os: windows-2019
+ os: windows-2022
triplet: x64-windows-static
suffix: 'windows-win64'
- generator: 'Visual Studio 16 2019'
+ generator: 'Visual Studio 17 2022'
arch: '-A x64'
- name: 'Windows x86'
- os: windows-2019
+ os: windows-2022
triplet: x86-windows-static
suffix: 'windows-win32'
- generator: 'Visual Studio 16 2019'
+ generator: 'Visual Studio 17 2022'
arch: '-A Win32'
steps:
diff --git a/.github/workflows/codeql-analysis.yml
b/.github/workflows/codeql-analysis.yml
index c877c64..70f833c 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -60,7 +60,7 @@ jobs:
sudo apt-get install -y libcurl4-openssl-dev libssl-dev \
protobuf-compiler libprotobuf-dev libboost-dev \
libboost-dev libboost-program-options-dev \
- libzstd-dev libsnappy-dev
+ libzstd-dev libsnappy-dev libroaring-dev
- name: Build
run: |
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d9af806..de9a245 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -124,6 +124,7 @@ if (INTEGRATE_VCPKG)
find_package(protobuf CONFIG REQUIRED)
find_package(zstd CONFIG REQUIRED)
find_package(Snappy CONFIG REQUIRED)
+ find_package(roaring CONFIG REQUIRED)
set(COMMON_LIBS CURL::libcurl
ZLIB::ZLIB
OpenSSL::SSL
@@ -131,6 +132,7 @@ if (INTEGRATE_VCPKG)
protobuf::libprotobuf
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
Snappy::snappy
+ roaring::roaring
)
if (USE_ASIO)
find_package(asio CONFIG REQUIRED)
diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake
index 5004545..2c70ec2 100644
--- a/LegacyFindPackages.cmake
+++ b/LegacyFindPackages.cmake
@@ -117,6 +117,25 @@ if (NOT ZLIB_INCLUDE_DIRS OR NOT ZLIB_LIBRARIES)
message(FATAL_ERROR "Could not find zlib")
endif ()
+find_package(roaring QUIET)
+if (NOT ROARING_FOUND)
+ find_path(ROARING_INCLUDE_DIRS NAMES roaring/roaring.hh)
+ find_library(ROARING_LIBRARIES NAMES roaring libroaring)
+endif ()
+message("ROARING_INCLUDE_DIRS: " ${ROARING_INCLUDE_DIRS})
+message("ROARING_LIBRARIES: " ${ROARING_LIBRARIES})
+if (NOT ROARING_INCLUDE_DIRS OR NOT ROARING_LIBRARIES)
+ message(FATAL_ERROR "Could not find libroaring")
+endif ()
+file(READ "${ROARING_INCLUDE_DIRS}/roaring/roaring.hh" ROARING_HEADER_CONTENTS)
+string(REGEX MATCH "namespace roaring" ROARING_HAS_NAMESPACE
"${ROARING_HEADER_CONTENTS}")
+if (ROARING_HAS_NAMESPACE)
+ message(STATUS "Roaring64Map is in namespace roaring")
+else ()
+ message(STATUS "Roaring64Map is in global namespace")
+ add_definitions(-DROARING_NAMESPACE_GLOBAL)
+endif ()
+
if (LINK_STATIC AND NOT VCPKG_TRIPLET)
find_library(LIB_ZSTD NAMES libzstd.a)
message(STATUS "ZStd: ${LIB_ZSTD}")
@@ -129,6 +148,7 @@ if (LINK_STATIC AND NOT VCPKG_TRIPLET)
elseif (LINK_STATIC AND VCPKG_TRIPLET)
find_package(Protobuf REQUIRED)
message(STATUS "Found protobuf static library: " ${Protobuf_LIBRARIES})
+ find_package(roaring REQUIRED)
if (MSVC AND (${CMAKE_BUILD_TYPE} STREQUAL Debug))
find_library(ZLIB_LIBRARIES NAMES zlibd)
else ()
@@ -231,6 +251,7 @@ include_directories(
${Boost_INCLUDE_DIRS}
${OPENSSL_INCLUDE_DIR}
${ZLIB_INCLUDE_DIRS}
+ ${ROARING_INCLUDE_DIRS}
${CURL_INCLUDE_DIRS}
${Protobuf_INCLUDE_DIRS}
${GTEST_INCLUDE_PATH}
@@ -246,6 +267,7 @@ set(COMMON_LIBS
${CURL_LIBRARIES}
${OPENSSL_LIBRARIES}
${ZLIB_LIBRARIES}
+ ${ROARING_LIBRARIES}
${ADDITIONAL_LIBRARIES}
${CMAKE_DL_LIBS}
)
diff --git a/dependencies.yaml b/dependencies.yaml
index 8d338e4..55087d2 100644
--- a/dependencies.yaml
+++ b/dependencies.yaml
@@ -26,3 +26,4 @@ zstd: 1.5.5
snappy: 1.1.10
openssl: 1.1.1w
curl: 8.6.0
+roaring: 4.3.1
diff --git a/include/pulsar/ConsumerConfiguration.h
b/include/pulsar/ConsumerConfiguration.h
index 3254a95..ee0550c 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -277,6 +277,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
long getNegativeAckRedeliveryDelayMs() const;
+ /**
+ * Set the precision bit count for negative ack redelivery delay.
+ * The lower bits of the redelivery time will be trimmed to reduce the
memory occupation.
+ * @param negativeAckPrecisionBitCnt
+ * negative ack precision bit count
+ */
+ void setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt);
+
+ /**
+ * Get the configured precision bit count for negative ack redelivery
delay.
+ *
+ * @return redelivery time precision bit count
+ */
+ int getNegativeAckPrecisionBitCnt() const;
+
/**
* Set time window in milliseconds for grouping message ACK requests. An
ACK request is not sent
* to broker until the time window reaches its end, or the number of
grouped messages reaches
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 60f8fea..3c39cad 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -134,6 +134,16 @@ void ConsumerConfiguration::setAckGroupingTimeMs(long
ackGroupingMillis) {
impl_->ackGroupingTimeMs = ackGroupingMillis;
}
+int ConsumerConfiguration::getNegativeAckPrecisionBitCnt() const { return
impl_->negativeAckPrecisionBitCnt; }
+
+void ConsumerConfiguration::setNegativeAckPrecisionBitCnt(int
negativeAckPrecisionBitCnt) {
+ if (negativeAckPrecisionBitCnt < 0) {
+ throw std::invalid_argument(
+ "Consumer Config Exception: NegativeAckPrecisionBitCnt should be
nonnegative number.");
+ }
+ impl_->negativeAckPrecisionBitCnt = negativeAckPrecisionBitCnt;
+}
+
long ConsumerConfiguration::getAckGroupingTimeMs() const { return
impl_->ackGroupingTimeMs; }
void ConsumerConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 711b6f9..2acbfb4 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -26,6 +26,7 @@ struct ConsumerConfigurationImpl {
long unAckedMessagesTimeoutMs{0};
long tickDurationInMs{1000};
long negativeAckRedeliveryDelayMs{60000};
+ int negativeAckPrecisionBitCnt{8};
long ackGroupingTimeMs{100};
long ackGroupingMaxSize{1000};
long brokerConsumerStatsCacheTimeInMs{30 * 1000L}; // 30 seconds
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 7116b1c..9dc64cd 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -19,14 +19,19 @@
#include "NegativeAcksTracker.h"
+#include <cstdint>
#include <functional>
#include <set>
+#include <utility>
#include "ClientImpl.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "MessageIdUtil.h"
+#include "pulsar/MessageBuilder.h"
+#include "pulsar/MessageId.h"
+#include "pulsar/MessageIdBuilder.h"
DECLARE_LOG_OBJECT()
namespace pulsar {
@@ -41,6 +46,7 @@ NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr
&client, ConsumerIm
nackDelay_ =
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(),
MIN_NACK_DELAY_MILLIS));
timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3));
+ nackPrecisionBit_ = conf.getNegativeAckPrecisionBitCnt();
LOG_DEBUG("Created negative ack tracker with delay: " <<
nackDelay_.count() << " ms - Timer interval: "
<<
timerInterval_.count());
}
@@ -75,13 +81,22 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec)
{
auto now = Clock::now();
+ // The map is sorted by time, so we can exit immediately when we traverse
to a time that does not match
for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) {
- if (it->second < now) {
- messagesToRedeliver.insert(it->first);
- it = nackedMessages_.erase(it);
- } else {
- ++it;
+ if (it->first > now) {
+ // We are done with all the messages that need to be redelivered
+ break;
}
+
+ auto ledgerMap = it->second;
+ for (auto ledgerIt = ledgerMap.begin(); ledgerIt != ledgerMap.end();
++ledgerIt) {
+ auto entrySet = ledgerIt->second;
+ for (auto setIt = entrySet.begin(); setIt != entrySet.end();
++setIt) {
+ messagesToRedeliver.insert(
+
MessageIdBuilder().ledgerId(ledgerIt->first).entryId(*setIt).build());
+ }
+ }
+ it = nackedMessages_.erase(it);
}
lock.unlock();
@@ -92,14 +107,27 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR
&ec) {
scheduleTimer();
}
+std::chrono::steady_clock::time_point trimLowerBit(const
std::chrono::steady_clock::time_point &tp,
+ int bits) {
+ // get origin timestamp in nanoseconds
+ auto timestamp =
std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count();
+
+ // trim lower bits
+ auto trimmedTimestamp = timestamp & (~((1LL << bits) - 1));
+
+ return
std::chrono::steady_clock::time_point(std::chrono::nanoseconds(trimmedTimestamp));
+}
+
void NegativeAcksTracker::add(const MessageId &m) {
auto msgId = discardBatch(m);
auto now = Clock::now();
{
std::lock_guard<std::mutex> lock{mutex_};
+ auto trimmedTimestamp = trimLowerBit(now + nackDelay_,
nackPrecisionBit_);
+ // If the timestamp is already in the map, we can just add the message
to the existing entry
// Erase batch id to group all nacks from same batch
- nackedMessages_[msgId] = now + nackDelay_;
+
nackedMessages_[trimmedTimestamp][msgId.ledgerId()].add((uint64_t)msgId.entryId());
}
scheduleTimer();
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index bf1d931..8199966 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -27,6 +27,8 @@
#include <map>
#include <memory>
#include <mutex>
+#include <roaring/roaring64map.hh>
+#include <unordered_map>
#include "AsioDefines.h"
#include "AsioTimer.h"
@@ -39,6 +41,12 @@ class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+using LedgerId = int64_t;
+#ifdef ROARING_NAMESPACE_GLOBAL
+using ConditionalRoaringMap = Roaring64Map;
+#else
+using ConditionalRoaringMap = roaring::Roaring64Map;
+#endif
class NegativeAcksTracker : public
std::enable_shared_from_this<NegativeAcksTracker> {
public:
@@ -64,8 +72,9 @@ class NegativeAcksTracker : public
std::enable_shared_from_this<NegativeAcksTrac
std::chrono::milliseconds nackDelay_;
std::chrono::milliseconds timerInterval_;
+ int nackPrecisionBit_;
typedef typename std::chrono::steady_clock Clock;
- std::map<MessageId, Clock::time_point> nackedMessages_;
+ std::map<Clock::time_point, std::unordered_map<LedgerId,
ConditionalRoaringMap>> nackedMessages_;
const DeadlineTimerPtr timer_;
std::atomic_bool closed_{false};
diff --git a/lib/c/c_ConsumerConfiguration.cc b/lib/c/c_ConsumerConfiguration.cc
index 248957f..273e7f3 100644
--- a/lib/c/c_ConsumerConfiguration.cc
+++ b/lib/c/c_ConsumerConfiguration.cc
@@ -121,6 +121,16 @@ long pulsar_configure_get_negative_ack_redelivery_delay_ms(
return
consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
}
+void pulsar_configure_set_negative_ack_precision_bit_cnt(
+ pulsar_consumer_configuration_t *consumer_configuration, int
negativeAckPrecisionBitCnt) {
+
consumer_configuration->consumerConfiguration.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCnt);
+}
+
+int pulsar_configure_get_negative_ack_precision_bit_cnt(
+ pulsar_consumer_configuration_t *consumer_configuration) {
+ return
consumer_configuration->consumerConfiguration.getNegativeAckPrecisionBitCnt();
+}
+
void pulsar_configure_set_ack_grouping_time_ms(pulsar_consumer_configuration_t
*consumer_configuration,
long ackGroupingMillis) {
consumer_configuration->consumerConfiguration.setAckGroupingTimeMs(ackGroupingMillis);
diff --git a/pkg/apk/Dockerfile b/pkg/apk/Dockerfile
index d7d8718..db501f0 100644
--- a/pkg/apk/Dockerfile
+++ b/pkg/apk/Dockerfile
@@ -34,6 +34,7 @@ RUN apk add \
python3 \
py3-pip \
perl \
+ git \
sudo
RUN pip3 install pyyaml
@@ -86,6 +87,15 @@ RUN SNAPPY_VERSION=$(dep-version.py snappy) && \
make -j8 && make install && \
rm -rf /snappy-${SNAPPY_VERSION} /${SNAPPY_VERSION}.tar.gz
+# Roaring
+RUN ROARING_VERSION=$(dep-version.py roaring) && \
+ curl -O -L
https://github.com/RoaringBitmap/CRoaring/archive/refs/tags/v${ROARING_VERSION}.tar.gz
&& \
+ tar xfz v${ROARING_VERSION}.tar.gz && \
+ cd CRoaring-${ROARING_VERSION} && \
+ mkdir build && cd build && CXXFLAGS="-fPIC -O3" cmake .. && \
+ make -j8 && make install && \
+ rm -rf /v${ROARING_VERSION}.tar.gz /CRoaring-${ROARING_VERSION}
+
RUN OPENSSL_VERSION=$(dep-version.py openssl) && \
OPENSSL_VERSION_UNDERSCORE=$(echo $OPENSSL_VERSION | sed 's/\./_/g') && \
curl -O -L
https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION_UNDERSCORE}.tar.gz
&& \
diff --git a/pkg/deb/Dockerfile b/pkg/deb/Dockerfile
index 502b093..bbafa8b 100644
--- a/pkg/deb/Dockerfile
+++ b/pkg/deb/Dockerfile
@@ -19,7 +19,7 @@
# Build pulsar client library in Centos with tools to
-FROM debian:10
+FROM debian:11
ARG PLATFORM
@@ -32,7 +32,8 @@ RUN apt-get update -y && \
perl \
dpkg-dev \
python3 \
- python3-pip
+ python3-pip \
+ git
RUN pip3 install pyyaml
@@ -91,6 +92,15 @@ RUN SNAPPY_VERSION=$(dep-version.py snappy) && \
make -j8 && make install && \
rm -rf /snappy-${SNAPPY_VERSION} /${SNAPPY_VERSION}.tar.gz
+# Roaring
+RUN ROARING_VERSION=$(dep-version.py roaring) && \
+ curl -O -L
https://github.com/RoaringBitmap/CRoaring/archive/refs/tags/v${ROARING_VERSION}.tar.gz
&& \
+ tar xfz v${ROARING_VERSION}.tar.gz && \
+ cd CRoaring-${ROARING_VERSION} && \
+ mkdir build && cd build && CXXFLAGS="-fPIC -O3" cmake .. && \
+ make -j8 && make install && \
+ rm -rf /v${ROARING_VERSION}.tar.gz /CRoaring-${ROARING_VERSION}
+
RUN OPENSSL_VERSION=$(dep-version.py openssl) && \
OPENSSL_VERSION_UNDERSCORE=$(echo $OPENSSL_VERSION | sed 's/\./_/g') && \
curl -O -L
https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION_UNDERSCORE}.tar.gz
&& \
diff --git a/pkg/mac/build-static-library.sh b/pkg/mac/build-static-library.sh
index 449222b..32c6042 100755
--- a/pkg/mac/build-static-library.sh
+++ b/pkg/mac/build-static-library.sh
@@ -72,5 +72,5 @@ cp ./build-osx/libpulsarwithdeps.a $INSTALL_DIR/lib/
# Test the libraries
clang++ win-examples/example.cc -o dynamic.out -std=c++11 -arch $ARCH -I
$INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar
./dynamic.out
-clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
+clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
$PWD/build-osx/vcpkg_installed/$VCPKG_TRIPLET/lib/libroaring.a
./static.out
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index e3c7039..c269538 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -3249,6 +3249,54 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
testNegativeAcks(topicName, true);
}
+void testNegativeAckPrecisionBitCnt(const std::string &topic, int
precisionBitCnt) {
+ constexpr int delayMs = 2000;
+ const int64_t timeDeviation = 1L << precisionBitCnt;
+
+ Client client(lookupUrl);
+
+ Consumer consumer;
+ ConsumerConfiguration conf;
+ conf.setNegativeAckRedeliveryDelayMs(delayMs);
+ conf.setNegativeAckPrecisionBitCnt(precisionBitCnt);
+
+ Result result = client.subscribe(topic, "sub1", conf, consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ Producer producer;
+ ProducerConfiguration producerConf;
+ result = client.createProducer(topic, producerConf, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Message msg = MessageBuilder().setContent("test-0").build();
+ producer.sendAsync(msg, nullptr);
+ producer.flush();
+
+ // receive and trigger negative ack
+ Message received;
+ consumer.receive(received);
+ consumer.negativeAcknowledge(received);
+
+ int64_t expectedRedeliveryTime = TimeUtils::currentTimeMillis() + delayMs;
+
+ Message redelivered;
+ consumer.receive(redelivered);
+ int64_t now = TimeUtils::currentTimeMillis();
+ ASSERT_GE(now, expectedRedeliveryTime - timeDeviation);
+ ASSERT_EQ(redelivered.getDataAsString(), "test-0");
+
+ consumer.acknowledge(redelivered);
+ client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testNegativeAckPrecisionBitCnt) {
+ for (int precisionBitCnt = 1; precisionBitCnt <= 12; precisionBitCnt++) {
+ std::string topic = "testNegativeAckPrecisionBitCnt-" +
std::to_string(precisionBitCnt) + "-" +
+ std::to_string(time(nullptr));
+ testNegativeAckPrecisionBitCnt(topic, precisionBitCnt);
+ }
+}
+
static long regexTestMessagesReceived = 0;
static void regexMessageListenerFunction(const Consumer &consumer, const
Message &msg) {
diff --git a/tests/ConsumerConfigurationTest.cc
b/tests/ConsumerConfigurationTest.cc
index e44543d..e847f4f 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -52,6 +52,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(conf.getTickDurationInMs(), 1000);
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
+ ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 8);
ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
@@ -114,6 +115,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setNegativeAckRedeliveryDelayMs(10000);
ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);
+ conf.setNegativeAckPrecisionBitCnt(4);
+ ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 4);
+
conf.setAckGroupingTimeMs(200);
ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);
diff --git a/vcpkg.json b/vcpkg.json
index ec9aabd..5ec0e0e 100644
--- a/vcpkg.json
+++ b/vcpkg.json
@@ -43,6 +43,10 @@
"name": "protobuf",
"version>=": "3.21.12"
},
+ {
+ "name": "roaring",
+ "version>=": "4.3.1"
+ },
{
"name": "snappy",
"version>=": "1.1.10"