This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b86f9e  [improve] modify the negativeACK structure to reduce memory 
overhead (#497)
9b86f9e is described below

commit 9b86f9e2b037015cc437abc2f5f8b9475846bf95
Author: Guangyang Deng <[email protected]>
AuthorDate: Mon Aug 4 14:07:54 2025 +0800

    [improve] modify the negativeACK structure to reduce memory overhead (#497)
---
 .github/workflows/ci-pr-validation.yaml | 10 +++----
 .github/workflows/codeql-analysis.yml   |  2 +-
 CMakeLists.txt                          |  2 ++
 LegacyFindPackages.cmake                | 22 +++++++++++++++
 dependencies.yaml                       |  1 +
 include/pulsar/ConsumerConfiguration.h  | 15 +++++++++++
 lib/ConsumerConfiguration.cc            | 10 +++++++
 lib/ConsumerConfigurationImpl.h         |  1 +
 lib/NegativeAcksTracker.cc              | 40 ++++++++++++++++++++++-----
 lib/NegativeAcksTracker.h               | 11 +++++++-
 lib/c/c_ConsumerConfiguration.cc        | 10 +++++++
 pkg/apk/Dockerfile                      | 10 +++++++
 pkg/deb/Dockerfile                      | 14 ++++++++--
 pkg/mac/build-static-library.sh         |  2 +-
 tests/BasicEndToEndTest.cc              | 48 +++++++++++++++++++++++++++++++++
 tests/ConsumerConfigurationTest.cc      |  4 +++
 vcpkg.json                              |  4 +++
 17 files changed, 190 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index c514009..157cf25 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -167,7 +167,7 @@ jobs:
           sudo apt-get install -y libcurl4-openssl-dev libssl-dev \
             protobuf-compiler libprotobuf-dev libboost-dev \
             libboost-dev libboost-program-options-dev \
-            libzstd-dev libsnappy-dev libgmock-dev libgtest-dev
+            libzstd-dev libsnappy-dev libgmock-dev libgtest-dev libroaring-dev
       - name: CMake
         run: cmake -B build -DBUILD_PERF_TOOLS=ON -DCMAKE_CXX_STANDARD=20
       - name: Build
@@ -188,16 +188,16 @@ jobs:
       matrix:
         include:
           - name: 'Windows x64'
-            os: windows-2019
+            os: windows-2022
             triplet: x64-windows-static
             suffix: 'windows-win64'
-            generator: 'Visual Studio 16 2019'
+            generator: 'Visual Studio 17 2022'
             arch: '-A x64'
           - name: 'Windows x86'
-            os: windows-2019
+            os: windows-2022
             triplet: x86-windows-static
             suffix: 'windows-win32'
-            generator: 'Visual Studio 16 2019'
+            generator: 'Visual Studio 17 2022'
             arch: '-A Win32'
 
     steps:
diff --git a/.github/workflows/codeql-analysis.yml 
b/.github/workflows/codeql-analysis.yml
index c877c64..70f833c 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -60,7 +60,7 @@ jobs:
         sudo apt-get install -y libcurl4-openssl-dev libssl-dev \
           protobuf-compiler libprotobuf-dev libboost-dev \
           libboost-dev libboost-program-options-dev \
-          libzstd-dev libsnappy-dev
+          libzstd-dev libsnappy-dev libroaring-dev
 
     - name: Build
       run: |
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d9af806..de9a245 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -124,6 +124,7 @@ if (INTEGRATE_VCPKG)
     find_package(protobuf CONFIG REQUIRED)
     find_package(zstd CONFIG REQUIRED)
     find_package(Snappy CONFIG REQUIRED)
+    find_package(roaring CONFIG REQUIRED)
     set(COMMON_LIBS CURL::libcurl
         ZLIB::ZLIB
         OpenSSL::SSL
@@ -131,6 +132,7 @@ if (INTEGRATE_VCPKG)
         protobuf::libprotobuf
         
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
         Snappy::snappy
+        roaring::roaring
         )
     if (USE_ASIO)
         find_package(asio CONFIG REQUIRED)
diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake
index 5004545..2c70ec2 100644
--- a/LegacyFindPackages.cmake
+++ b/LegacyFindPackages.cmake
@@ -117,6 +117,25 @@ if (NOT ZLIB_INCLUDE_DIRS OR NOT ZLIB_LIBRARIES)
     message(FATAL_ERROR "Could not find zlib")
 endif ()
 
+find_package(roaring QUIET)
+if (NOT ROARING_FOUND)
+    find_path(ROARING_INCLUDE_DIRS NAMES roaring/roaring.hh)
+    find_library(ROARING_LIBRARIES NAMES roaring libroaring)
+endif ()
+message("ROARING_INCLUDE_DIRS: " ${ROARING_INCLUDE_DIRS})
+message("ROARING_LIBRARIES: " ${ROARING_LIBRARIES})
+if (NOT ROARING_INCLUDE_DIRS OR NOT ROARING_LIBRARIES)
+    message(FATAL_ERROR "Could not find libroaring")
+endif ()
+file(READ "${ROARING_INCLUDE_DIRS}/roaring/roaring.hh" ROARING_HEADER_CONTENTS)
+string(REGEX MATCH "namespace roaring" ROARING_HAS_NAMESPACE 
"${ROARING_HEADER_CONTENTS}")
+if (ROARING_HAS_NAMESPACE)
+    message(STATUS "Roaring64Map is in namespace roaring")
+else ()
+    message(STATUS "Roaring64Map is in global namespace")
+    add_definitions(-DROARING_NAMESPACE_GLOBAL)
+endif ()
+
 if (LINK_STATIC AND NOT VCPKG_TRIPLET)
     find_library(LIB_ZSTD NAMES libzstd.a)
     message(STATUS "ZStd: ${LIB_ZSTD}")
@@ -129,6 +148,7 @@ if (LINK_STATIC AND NOT VCPKG_TRIPLET)
 elseif (LINK_STATIC AND VCPKG_TRIPLET)
     find_package(Protobuf REQUIRED)
     message(STATUS "Found protobuf static library: " ${Protobuf_LIBRARIES})
+    find_package(roaring REQUIRED)
     if (MSVC AND (${CMAKE_BUILD_TYPE} STREQUAL Debug))
         find_library(ZLIB_LIBRARIES NAMES zlibd)
     else ()
@@ -231,6 +251,7 @@ include_directories(
   ${Boost_INCLUDE_DIRS}
   ${OPENSSL_INCLUDE_DIR}
   ${ZLIB_INCLUDE_DIRS}
+  ${ROARING_INCLUDE_DIRS}
   ${CURL_INCLUDE_DIRS}
   ${Protobuf_INCLUDE_DIRS}
   ${GTEST_INCLUDE_PATH}
@@ -246,6 +267,7 @@ set(COMMON_LIBS
   ${CURL_LIBRARIES}
   ${OPENSSL_LIBRARIES}
   ${ZLIB_LIBRARIES}
+  ${ROARING_LIBRARIES}
   ${ADDITIONAL_LIBRARIES}
   ${CMAKE_DL_LIBS}
 )
diff --git a/dependencies.yaml b/dependencies.yaml
index 8d338e4..55087d2 100644
--- a/dependencies.yaml
+++ b/dependencies.yaml
@@ -26,3 +26,4 @@ zstd: 1.5.5
 snappy: 1.1.10
 openssl: 1.1.1w
 curl: 8.6.0
+roaring: 4.3.1
diff --git a/include/pulsar/ConsumerConfiguration.h 
b/include/pulsar/ConsumerConfiguration.h
index 3254a95..ee0550c 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -277,6 +277,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     long getNegativeAckRedeliveryDelayMs() const;
 
+    /**
+     * Set the precision bit count for negative ack redelivery delay.
+     * The lower bits of the redelivery time will be trimmed to reduce the 
memory occupation.
+     * @param negativeAckPrecisionBitCnt
+     *            negative ack precision bit count
+     */
+    void setNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt);
+
+    /**
+     * Get the configured precision bit count for negative ack redelivery 
delay.
+     *
+     * @return redelivery time precision bit count
+     */
+    int getNegativeAckPrecisionBitCnt() const;
+
     /**
      * Set time window in milliseconds for grouping message ACK requests. An 
ACK request is not sent
      * to broker until the time window reaches its end, or the number of 
grouped messages reaches
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 60f8fea..3c39cad 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -134,6 +134,16 @@ void ConsumerConfiguration::setAckGroupingTimeMs(long 
ackGroupingMillis) {
     impl_->ackGroupingTimeMs = ackGroupingMillis;
 }
 
+int ConsumerConfiguration::getNegativeAckPrecisionBitCnt() const { return 
impl_->negativeAckPrecisionBitCnt; }
+
+void ConsumerConfiguration::setNegativeAckPrecisionBitCnt(int 
negativeAckPrecisionBitCnt) {
+    if (negativeAckPrecisionBitCnt < 0) {
+        throw std::invalid_argument(
+            "Consumer Config Exception: NegativeAckPrecisionBitCnt should be 
nonnegative number.");
+    }
+    impl_->negativeAckPrecisionBitCnt = negativeAckPrecisionBitCnt;
+}
+
 long ConsumerConfiguration::getAckGroupingTimeMs() const { return 
impl_->ackGroupingTimeMs; }
 
 void ConsumerConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 711b6f9..2acbfb4 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -26,6 +26,7 @@ struct ConsumerConfigurationImpl {
     long unAckedMessagesTimeoutMs{0};
     long tickDurationInMs{1000};
     long negativeAckRedeliveryDelayMs{60000};
+    int negativeAckPrecisionBitCnt{8};
     long ackGroupingTimeMs{100};
     long ackGroupingMaxSize{1000};
     long brokerConsumerStatsCacheTimeInMs{30 * 1000L};  // 30 seconds
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 7116b1c..9dc64cd 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -19,14 +19,19 @@
 
 #include "NegativeAcksTracker.h"
 
+#include <cstdint>
 #include <functional>
 #include <set>
+#include <utility>
 
 #include "ClientImpl.h"
 #include "ConsumerImpl.h"
 #include "ExecutorService.h"
 #include "LogUtils.h"
 #include "MessageIdUtil.h"
+#include "pulsar/MessageBuilder.h"
+#include "pulsar/MessageId.h"
+#include "pulsar/MessageIdBuilder.h"
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
@@ -41,6 +46,7 @@ NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr 
&client, ConsumerIm
     nackDelay_ =
         
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), 
MIN_NACK_DELAY_MILLIS));
     timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3));
+    nackPrecisionBit_ = conf.getNegativeAckPrecisionBitCnt();
     LOG_DEBUG("Created negative ack tracker with delay: " << 
nackDelay_.count() << " ms - Timer interval: "
                                                           << 
timerInterval_.count());
 }
@@ -75,13 +81,22 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) 
{
 
     auto now = Clock::now();
 
+    // The map is sorted by time, so we can exit immediately when we traverse 
to a time that does not match
     for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) {
-        if (it->second < now) {
-            messagesToRedeliver.insert(it->first);
-            it = nackedMessages_.erase(it);
-        } else {
-            ++it;
+        if (it->first > now) {
+            // We are done with all the messages that need to be redelivered
+            break;
         }
+
+        auto ledgerMap = it->second;
+        for (auto ledgerIt = ledgerMap.begin(); ledgerIt != ledgerMap.end(); 
++ledgerIt) {
+            auto entrySet = ledgerIt->second;
+            for (auto setIt = entrySet.begin(); setIt != entrySet.end(); 
++setIt) {
+                messagesToRedeliver.insert(
+                    
MessageIdBuilder().ledgerId(ledgerIt->first).entryId(*setIt).build());
+            }
+        }
+        it = nackedMessages_.erase(it);
     }
     lock.unlock();
 
@@ -92,14 +107,27 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR 
&ec) {
     scheduleTimer();
 }
 
+std::chrono::steady_clock::time_point trimLowerBit(const 
std::chrono::steady_clock::time_point &tp,
+                                                   int bits) {
+    // get origin timestamp in nanoseconds
+    auto timestamp = 
std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count();
+
+    // trim lower bits
+    auto trimmedTimestamp = timestamp & (~((1LL << bits) - 1));
+
+    return 
std::chrono::steady_clock::time_point(std::chrono::nanoseconds(trimmedTimestamp));
+}
+
 void NegativeAcksTracker::add(const MessageId &m) {
     auto msgId = discardBatch(m);
     auto now = Clock::now();
 
     {
         std::lock_guard<std::mutex> lock{mutex_};
+        auto trimmedTimestamp = trimLowerBit(now + nackDelay_, 
nackPrecisionBit_);
+        // If the timestamp is already in the map, we can just add the message 
to the existing entry
         // Erase batch id to group all nacks from same batch
-        nackedMessages_[msgId] = now + nackDelay_;
+        
nackedMessages_[trimmedTimestamp][msgId.ledgerId()].add((uint64_t)msgId.entryId());
     }
 
     scheduleTimer();
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index bf1d931..8199966 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -27,6 +27,8 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <roaring/roaring64map.hh>
+#include <unordered_map>
 
 #include "AsioDefines.h"
 #include "AsioTimer.h"
@@ -39,6 +41,12 @@ class ClientImpl;
 using ClientImplPtr = std::shared_ptr<ClientImpl>;
 class ExecutorService;
 using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
+using LedgerId = int64_t;
+#ifdef ROARING_NAMESPACE_GLOBAL
+using ConditionalRoaringMap = Roaring64Map;
+#else
+using ConditionalRoaringMap = roaring::Roaring64Map;
+#endif
 
 class NegativeAcksTracker : public 
std::enable_shared_from_this<NegativeAcksTracker> {
    public:
@@ -64,8 +72,9 @@ class NegativeAcksTracker : public 
std::enable_shared_from_this<NegativeAcksTrac
 
     std::chrono::milliseconds nackDelay_;
     std::chrono::milliseconds timerInterval_;
+    int nackPrecisionBit_;
     typedef typename std::chrono::steady_clock Clock;
-    std::map<MessageId, Clock::time_point> nackedMessages_;
+    std::map<Clock::time_point, std::unordered_map<LedgerId, 
ConditionalRoaringMap>> nackedMessages_;
 
     const DeadlineTimerPtr timer_;
     std::atomic_bool closed_{false};
diff --git a/lib/c/c_ConsumerConfiguration.cc b/lib/c/c_ConsumerConfiguration.cc
index 248957f..273e7f3 100644
--- a/lib/c/c_ConsumerConfiguration.cc
+++ b/lib/c/c_ConsumerConfiguration.cc
@@ -121,6 +121,16 @@ long pulsar_configure_get_negative_ack_redelivery_delay_ms(
     return 
consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
 }
 
+void pulsar_configure_set_negative_ack_precision_bit_cnt(
+    pulsar_consumer_configuration_t *consumer_configuration, int 
negativeAckPrecisionBitCnt) {
+    
consumer_configuration->consumerConfiguration.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCnt);
+}
+
+int pulsar_configure_get_negative_ack_precision_bit_cnt(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return 
consumer_configuration->consumerConfiguration.getNegativeAckPrecisionBitCnt();
+}
+
 void pulsar_configure_set_ack_grouping_time_ms(pulsar_consumer_configuration_t 
*consumer_configuration,
                                                long ackGroupingMillis) {
     
consumer_configuration->consumerConfiguration.setAckGroupingTimeMs(ackGroupingMillis);
diff --git a/pkg/apk/Dockerfile b/pkg/apk/Dockerfile
index d7d8718..db501f0 100644
--- a/pkg/apk/Dockerfile
+++ b/pkg/apk/Dockerfile
@@ -34,6 +34,7 @@ RUN apk add \
       python3 \
       py3-pip \
       perl \
+      git \
       sudo
 
 RUN pip3 install pyyaml
@@ -86,6 +87,15 @@ RUN SNAPPY_VERSION=$(dep-version.py snappy) && \
     make -j8 && make install && \
     rm -rf /snappy-${SNAPPY_VERSION} /${SNAPPY_VERSION}.tar.gz
 
+# Roaring
+RUN ROARING_VERSION=$(dep-version.py roaring) && \
+    curl -O -L  
https://github.com/RoaringBitmap/CRoaring/archive/refs/tags/v${ROARING_VERSION}.tar.gz
 && \
+    tar xfz v${ROARING_VERSION}.tar.gz && \
+    cd CRoaring-${ROARING_VERSION} && \
+    mkdir build && cd build && CXXFLAGS="-fPIC -O3" cmake .. && \
+    make -j8 && make install && \
+    rm -rf /v${ROARING_VERSION}.tar.gz /CRoaring-${ROARING_VERSION}
+
 RUN OPENSSL_VERSION=$(dep-version.py openssl) && \
     OPENSSL_VERSION_UNDERSCORE=$(echo $OPENSSL_VERSION | sed 's/\./_/g') && \
     curl -O -L 
https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION_UNDERSCORE}.tar.gz
 && \
diff --git a/pkg/deb/Dockerfile b/pkg/deb/Dockerfile
index 502b093..bbafa8b 100644
--- a/pkg/deb/Dockerfile
+++ b/pkg/deb/Dockerfile
@@ -19,7 +19,7 @@
 
 # Build pulsar client library in Centos with tools to
 
-FROM debian:10
+FROM debian:11
 
 ARG PLATFORM
 
@@ -32,7 +32,8 @@ RUN apt-get update -y && \
         perl \
         dpkg-dev \
         python3 \
-        python3-pip
+        python3-pip \
+        git
 
 RUN pip3 install pyyaml
 
@@ -91,6 +92,15 @@ RUN SNAPPY_VERSION=$(dep-version.py snappy) && \
     make -j8 && make install && \
     rm -rf /snappy-${SNAPPY_VERSION} /${SNAPPY_VERSION}.tar.gz
 
+# Roaring
+RUN ROARING_VERSION=$(dep-version.py roaring) && \
+    curl -O -L  
https://github.com/RoaringBitmap/CRoaring/archive/refs/tags/v${ROARING_VERSION}.tar.gz
 && \
+    tar xfz v${ROARING_VERSION}.tar.gz && \
+    cd CRoaring-${ROARING_VERSION} && \
+    mkdir build && cd build && CXXFLAGS="-fPIC -O3" cmake .. && \
+    make -j8 && make install && \
+    rm -rf /v${ROARING_VERSION}.tar.gz /CRoaring-${ROARING_VERSION}
+
 RUN OPENSSL_VERSION=$(dep-version.py openssl) && \
     OPENSSL_VERSION_UNDERSCORE=$(echo $OPENSSL_VERSION | sed 's/\./_/g') && \
     curl -O -L 
https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION_UNDERSCORE}.tar.gz
 && \
diff --git a/pkg/mac/build-static-library.sh b/pkg/mac/build-static-library.sh
index 449222b..32c6042 100755
--- a/pkg/mac/build-static-library.sh
+++ b/pkg/mac/build-static-library.sh
@@ -72,5 +72,5 @@ cp ./build-osx/libpulsarwithdeps.a $INSTALL_DIR/lib/
 # Test the libraries
 clang++ win-examples/example.cc -o dynamic.out -std=c++11 -arch $ARCH -I 
$INSTALL_DIR/include -L $INSTALL_DIR/lib -Wl,-rpath $INSTALL_DIR/lib -lpulsar
 ./dynamic.out
-clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I 
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a
+clang++ win-examples/example.cc -o static.out -std=c++11 -arch $ARCH -I 
$INSTALL_DIR/include $INSTALL_DIR/lib/libpulsarwithdeps.a 
$PWD/build-osx/vcpkg_installed/$VCPKG_TRIPLET/lib/libroaring.a
 ./static.out
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index e3c7039..c269538 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -3249,6 +3249,54 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
     testNegativeAcks(topicName, true);
 }
 
+void testNegativeAckPrecisionBitCnt(const std::string &topic, int 
precisionBitCnt) {
+    constexpr int delayMs = 2000;
+    const int64_t timeDeviation = 1L << precisionBitCnt;
+
+    Client client(lookupUrl);
+
+    Consumer consumer;
+    ConsumerConfiguration conf;
+    conf.setNegativeAckRedeliveryDelayMs(delayMs);
+    conf.setNegativeAckPrecisionBitCnt(precisionBitCnt);
+
+    Result result = client.subscribe(topic, "sub1", conf, consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    result = client.createProducer(topic, producerConf, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Message msg = MessageBuilder().setContent("test-0").build();
+    producer.sendAsync(msg, nullptr);
+    producer.flush();
+
+    // receive and trigger negative ack
+    Message received;
+    consumer.receive(received);
+    consumer.negativeAcknowledge(received);
+
+    int64_t expectedRedeliveryTime = TimeUtils::currentTimeMillis() + delayMs;
+
+    Message redelivered;
+    consumer.receive(redelivered);
+    int64_t now = TimeUtils::currentTimeMillis();
+    ASSERT_GE(now, expectedRedeliveryTime - timeDeviation);
+    ASSERT_EQ(redelivered.getDataAsString(), "test-0");
+
+    consumer.acknowledge(redelivered);
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testNegativeAckPrecisionBitCnt) {
+    for (int precisionBitCnt = 1; precisionBitCnt <= 12; precisionBitCnt++) {
+        std::string topic = "testNegativeAckPrecisionBitCnt-" + 
std::to_string(precisionBitCnt) + "-" +
+                            std::to_string(time(nullptr));
+        testNegativeAckPrecisionBitCnt(topic, precisionBitCnt);
+    }
+}
+
 static long regexTestMessagesReceived = 0;
 
 static void regexMessageListenerFunction(const Consumer &consumer, const 
Message &msg) {
diff --git a/tests/ConsumerConfigurationTest.cc 
b/tests/ConsumerConfigurationTest.cc
index e44543d..e847f4f 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -52,6 +52,7 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
     ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
     ASSERT_EQ(conf.getTickDurationInMs(), 1000);
     ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
+    ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 8);
     ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
     ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
     ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
@@ -114,6 +115,9 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
     conf.setNegativeAckRedeliveryDelayMs(10000);
     ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);
 
+    conf.setNegativeAckPrecisionBitCnt(4);
+    ASSERT_EQ(conf.getNegativeAckPrecisionBitCnt(), 4);
+
     conf.setAckGroupingTimeMs(200);
     ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);
 
diff --git a/vcpkg.json b/vcpkg.json
index ec9aabd..5ec0e0e 100644
--- a/vcpkg.json
+++ b/vcpkg.json
@@ -43,6 +43,10 @@
       "name": "protobuf",
       "version>=": "3.21.12"
     },
+    {
+      "name": "roaring",
+      "version>=": "4.3.1"
+    },
     {
       "name": "snappy",
       "version>=": "1.1.10"

Reply via email to