This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 74b3564a7060041c0091ed31a794a8d190330614
Author: k2la <[email protected]>
AuthorDate: Tue Mar 3 04:55:33 2020 +0900

    [Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ 
(#6391)
    
    ### Motivation
    Fix #6168 .
    >On C++ lib, like the following log, unacked messages are redelivered after 
about 2 * unAckedMessagesTimeout.
    
    ### Modifications
    As same #3118, by using TimePartition, fixed ` UnackedMessageTracker` .
    - Add `TickDurationInMs`
    - Add `redeliverUnacknowledgedMessages` which require `MessageIds` to 
`ConsumerImpl`, `MultiTopicsConsumerImpl` and `PartitionedConsumerImpl`.
    (cherry picked from commit 333888ad61c062f9e3d2946918ffc21fafd441af)
---
 .../include/pulsar/ConsumerConfiguration.h         |   4 +
 .../include/pulsar/c/producer_configuration.h      |   2 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   6 ++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   2 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  21 +++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   1 +
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   1 +
 pulsar-client-cpp/lib/LogUtils.cc                  |   2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  25 ++++-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   1 +
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  24 ++++-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   1 +
 .../lib/UnAckedMessageTrackerEnabled.cc            | 106 +++++++++++++--------
 .../lib/UnAckedMessageTrackerEnabled.h             |   6 +-
 14 files changed, 152 insertions(+), 50 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 08b7c54..5468b37 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     long getUnAckedMessagesTimeoutMs() const;
 
+    void setTickDurationInMs(const uint64_t milliSeconds);
+
+    long getTickDurationInMs() const;
+
     /**
      * Set the delay to wait before re-delivering messages that have failed to 
be process.
      * <p>
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h 
b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index c846451..1fe44e0 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -178,4 +178,4 @@ PULSAR_PUBLIC void 
pulsar_producer_configuration_set_property(pulsar_producer_co
 
 #ifdef __cplusplus
 }
-#endif
\ No newline at end of file
+#endif
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 38fa1fe..546b8b9 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -98,6 +98,12 @@ void 
ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
     impl_->unAckedMessagesTimeoutMs = milliSeconds;
 }
 
+long ConsumerConfiguration::getTickDurationInMs() const { return 
impl_->tickDurationInMs; }
+
+void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
+    impl_->tickDurationInMs = milliSeconds;
+}
+
 void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long 
redeliveryDelayMillis) {
     impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
 }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 55dafd3..8dd1263 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -27,6 +27,7 @@ namespace pulsar {
 struct ConsumerConfigurationImpl {
     SchemaInfo schemaInfo;
     long unAckedMessagesTimeoutMs;
+    long tickDurationInMs;
 
     long negativeAckRedeliveryDelayMs;
     ConsumerType consumerType;
@@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl {
     ConsumerConfigurationImpl()
         : schemaInfo(),
           unAckedMessagesTimeoutMs(0),
+          tickDurationInMs(1000),
           negativeAckRedeliveryDelayMs(60000),
           consumerType(ConsumerExclusive),
           messageListener(),
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 9a7e506..a58e09b 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << 
consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        unAckedMessageTrackerPtr_.reset(
-            new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+        if (conf.getTickDurationInMs() > 0) {
+            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                conf.getUnAckedMessagesTimeoutMs(), 
conf.getTickDurationInMs(), client, *this));
+        } else {
+            unAckedMessageTrackerPtr_.reset(
+                new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+        }
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
@@ -954,6 +959,18 @@ Result ConsumerImpl::resumeMessageListener() {
 void ConsumerImpl::redeliverUnacknowledgedMessages() {
     static std::set<MessageId> emptySet;
     redeliverMessages(emptySet);
+    unAckedMessageTrackerPtr_->clear();
+}
+
+void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& 
messageIds) {
+    if (messageIds.empty()) {
+        return;
+    }
+    if (config_.getConsumerType() != ConsumerShared && 
config_.getConsumerType() != ConsumerKeyShared) {
+        redeliverUnacknowledgedMessages();
+        return;
+    }
+    redeliverMessages(messageIds);
 }
 
 void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 8a25b49..6d81fd0 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
 
     virtual void redeliverMessages(const std::set<MessageId>& messageIds);
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& 
messageIds);
     virtual void negativeAcknowledge(const MessageId& msgId);
 
     virtual void closeAsync(ResultCallback callback);
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h 
b/pulsar-client-cpp/lib/ConsumerImplBase.h
index ab6ed9f..fc15066 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -48,6 +48,7 @@ class ConsumerImplBase {
     virtual Result pauseMessageListener() = 0;
     virtual Result resumeMessageListener() = 0;
     virtual void redeliverUnacknowledgedMessages() = 0;
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& 
messageIds) = 0;
     virtual const std::string& getName() const = 0;
     virtual int getNumOfPrefetchedMessages() const = 0;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback) = 0;
diff --git a/pulsar-client-cpp/lib/LogUtils.cc 
b/pulsar-client-cpp/lib/LogUtils.cc
index e2615a5..e4f6a17 100644
--- a/pulsar-client-cpp/lib/LogUtils.cc
+++ b/pulsar-client-cpp/lib/LogUtils.cc
@@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) {
     return path.substr(startIdx + 1, endIdx - startIdx - 1);
 }
 
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 25addb9..e837adf 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -45,8 +45,13 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     consumerStr_ = consumerStrStream.str();
 
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        unAckedMessageTrackerPtr_.reset(
-            new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+        if (conf.getTickDurationInMs() > 0) {
+            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                conf.getUnAckedMessagesTimeoutMs(), 
conf.getTickDurationInMs(), client, *this));
+        } else {
+            unAckedMessageTrackerPtr_.reset(
+                new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+        }
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
@@ -654,6 +659,22 @@ void 
MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
          consumer++) {
         (consumer->second)->redeliverUnacknowledgedMessages();
     }
+    unAckedMessageTrackerPtr_->clear();
+}
+
+void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const 
std::set<MessageId>& messageIds) {
+    if (messageIds.empty()) {
+        return;
+    }
+    if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() 
!= ConsumerKeyShared) {
+        redeliverUnacknowledgedMessages();
+        return;
+    }
+    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned 
consumer.");
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        (consumer->second)->redeliverUnacknowledgedMessages(messageIds);
+    }
 }
 
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return 
messages_.size(); }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d190664..fa271fe 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     virtual Result pauseMessageListener();
     virtual Result resumeMessageListener();
     virtual void redeliverUnacknowledgedMessages();
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& 
messageIds);
     virtual int getNumOfPrefetchedMessages() const;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback);
     void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, 
MultiTopicsBrokerConsumerStatsPtr,
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 03fd2d2..0241a54 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -43,8 +43,13 @@ 
PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
     consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << 
subscriptionName << ","
                       << numPartitions << "]";
     if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        unAckedMessageTrackerPtr_.reset(
-            new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+        if (conf.getTickDurationInMs() > 0) {
+            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+                conf.getUnAckedMessagesTimeoutMs(), 
conf.getTickDurationInMs(), client, *this));
+        } else {
+            unAckedMessageTrackerPtr_.reset(
+                new 
UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, 
*this));
+        }
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
@@ -426,6 +431,21 @@ void 
PartitionedConsumerImpl::redeliverUnacknowledgedMessages() {
     for (ConsumerList::const_iterator i = consumers_.begin(); i != 
consumers_.end(); i++) {
         (*i)->redeliverUnacknowledgedMessages();
     }
+    unAckedMessageTrackerPtr_->clear();
+}
+
+void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const 
std::set<MessageId>& messageIds) {
+    if (messageIds.empty()) {
+        return;
+    }
+    if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() 
!= ConsumerKeyShared) {
+        redeliverUnacknowledgedMessages();
+        return;
+    }
+    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned 
consumer.");
+    for (ConsumerList::const_iterator i = consumers_.begin(); i != 
consumers_.end(); i++) {
+        (*i)->redeliverUnacknowledgedMessages(messageIds);
+    }
 }
 
 const std::string& PartitionedConsumerImpl::getName() const { return 
partitionStr_; }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 71b2a30..fb4b047 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     virtual Result pauseMessageListener();
     virtual Result resumeMessageListener();
     virtual void redeliverUnacknowledgedMessages();
+    virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& 
messageIds);
     virtual const std::string& getName() const;
     virtual int getNumOfPrefetchedMessages() const;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 2c768b2..7894e64 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
     timeoutHandlerHelper();
     ExecutorServicePtr executorService = 
client_->getIOExecutorProvider()->get();
     timer_ = executorService->createDeadlineTimer();
-    timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_));
+    
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
     timer_->async_wait([&](const boost::system::error_code& ec) {
         if (ec) {
             LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
@@ -42,86 +42,112 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
     std::lock_guard<std::mutex> acquire(lock_);
     LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for 
consumerPtr_ "
               << consumerReference_.getName().c_str());
-    if (!oldSet_.empty()) {
+
+    std::set<MessageId> headPartition = timePartitions.front();
+    timePartitions.pop_front();
+
+    std::set<MessageId> msgIdsToRedeliver;
+    if (!headPartition.empty()) {
         LOG_INFO(consumerReference_.getName().c_str()
-                 << ": " << oldSet_.size() << " Messages were not acked within 
" << timeoutMs_ << " time");
-        oldSet_.clear();
-        currentSet_.clear();
-        consumerReference_.redeliverUnacknowledgedMessages();
+                 << ": " << headPartition.size() << " Messages were not acked 
within "
+                 << timePartitions.size() * tickDurationInMs_ << " time");
+        for (auto it = headPartition.begin(); it != headPartition.end(); it++) 
{
+            msgIdsToRedeliver.insert(*it);
+            messageIdPartitionMap.erase(*it);
+        }
+    }
+    headPartition.clear();
+    timePartitions.push_back(headPartition);
+
+    if (msgIdsToRedeliver.size() > 0) {
+        consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
     }
-    oldSet_.swap(currentSet_);
 }
 
 UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, 
const ClientImplPtr client,
                                                            ConsumerImplBase& 
consumer)
     : consumerReference_(consumer) {
+    UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer);
+}
+
+UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, 
long tickDurationInMs,
+                                                           const ClientImplPtr 
client,
+                                                           ConsumerImplBase& 
consumer)
+    : consumerReference_(consumer) {
     timeoutMs_ = timeoutMs;
+    tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : 
timeoutMs;
     client_ = client;
+
+    int blankPartitions = (int)std::ceil((double)timeoutMs_ / 
tickDurationInMs_);
+    for (int i = 0; i < blankPartitions + 1; i++) {
+        std::set<MessageId> msgIds;
+        timePartitions.push_back(msgIds);
+    }
+
     timeoutHandler();
 }
 
 bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
     std::lock_guard<std::mutex> acquire(lock_);
-    oldSet_.erase(m);
-    return currentSet_.insert(m).second;
+    if (messageIdPartitionMap.count(m) == 0) {
+        bool insert = messageIdPartitionMap.insert(std::make_pair(m, 
timePartitions.back())).second;
+        return insert && timePartitions.back().insert(m).second;
+    }
+    return false;
 }
 
 bool UnAckedMessageTrackerEnabled::isEmpty() {
     std::lock_guard<std::mutex> acquire(lock_);
-    return oldSet_.empty() && currentSet_.empty();
+    return messageIdPartitionMap.empty();
 }
 
 bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
     std::lock_guard<std::mutex> acquire(lock_);
-    return oldSet_.erase(m) || currentSet_.erase(m);
+    bool removed = false;
+    std::map<MessageId, std::set<MessageId>>::iterator exist = 
messageIdPartitionMap.find(m);
+    if (exist != messageIdPartitionMap.end()) {
+        removed = exist->second.erase(m);
+    }
+    return removed;
 }
 
 long UnAckedMessageTrackerEnabled::size() {
     std::lock_guard<std::mutex> acquire(lock_);
-    return oldSet_.size() + currentSet_.size();
+    return messageIdPartitionMap.size();
 }
 
 void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     std::lock_guard<std::mutex> acquire(lock_);
-    for (std::set<MessageId>::iterator it = oldSet_.begin(); it != 
oldSet_.end();) {
-        if (*it < msgId && it->partition() == msgId.partition()) {
-            oldSet_.erase(it++);
-        } else {
-            it++;
-        }
-    }
-    for (std::set<MessageId>::iterator it = currentSet_.begin(); it != 
currentSet_.end();) {
-        if (*it < msgId && it->partition() == msgId.partition()) {
-            currentSet_.erase(it++);
-        } else {
-            it++;
+    for (auto it = messageIdPartitionMap.begin(); it != 
messageIdPartitionMap.end(); it++) {
+        MessageId msgIdInMap = it->first;
+        if (msgIdInMap < msgId) {
+            std::map<MessageId, std::set<MessageId>>::iterator exist = 
messageIdPartitionMap.find(msgId);
+            if (exist != messageIdPartitionMap.end()) {
+                exist->second.erase(msgId);
+            }
         }
     }
 }
 
 // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, 
should remove all it's message.
 void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& 
topic) {
-    for (std::set<MessageId>::iterator it = oldSet_.begin(); it != 
oldSet_.end();) {
-        const std::string& topicPartitionName = it->getTopicName();
-        if (topicPartitionName.find(topic) != std::string::npos) {
-            oldSet_.erase(it++);
-        } else {
-            it++;
-        }
-    }
-    for (std::set<MessageId>::iterator it = currentSet_.begin(); it != 
currentSet_.end();) {
-        const std::string& topicPartitionName = it->getTopicName();
-        if (topicPartitionName.find(topic) != std::string::npos) {
-            currentSet_.erase(it++);
-        } else {
-            it++;
+    std::lock_guard<std::mutex> acquire(lock_);
+    for (auto it = messageIdPartitionMap.begin(); it != 
messageIdPartitionMap.end(); it++) {
+        MessageId msgIdInMap = it->first;
+        if (msgIdInMap.getTopicName().compare(topic) == 0) {
+            std::map<MessageId, std::set<MessageId>>::iterator exist = 
messageIdPartitionMap.find(msgIdInMap);
+            if (exist != messageIdPartitionMap.end()) {
+                exist->second.erase(msgIdInMap);
+            }
         }
     }
 }
 
 void UnAckedMessageTrackerEnabled::clear() {
-    currentSet_.clear();
-    oldSet_.clear();
+    messageIdPartitionMap.clear();
+    for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
+        it->clear();
+    }
 }
 
 UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 921e747..c2b4012 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public 
UnAckedMessageTrackerInterface {
    public:
     ~UnAckedMessageTrackerEnabled();
     UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, 
ConsumerImplBase&);
+    UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const 
ClientImplPtr, ConsumerImplBase&);
     bool add(const MessageId& m);
     bool remove(const MessageId& m);
     void removeMessagesTill(const MessageId& msgId);
@@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public 
UnAckedMessageTrackerInterface {
     void timeoutHandlerHelper();
     bool isEmpty();
     long size();
-    std::set<MessageId> currentSet_;
-    std::set<MessageId> oldSet_;
+    std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
+    std::deque<std::set<MessageId>> timePartitions;
     std::mutex lock_;
     DeadlineTimerPtr timer_;
     ConsumerImplBase& consumerReference_;
     ClientImplPtr client_;
     long timeoutMs_;
+    long tickDurationInMs_;
 };
 }  // namespace pulsar
 

Reply via email to