This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git

commit ea8e59de0e4f6dd3513549befee362c9af1f98ca
Author: Gordon Sim <g...@redhat.com>
AuthorDate: Mon Apr 1 12:32:56 2019 +0100

    QPID-8293: limit the number of messages that can be purged in a single sweep
---
 src/qpid/broker/Broker.cpp      | 11 ++++++++++-
 src/qpid/broker/Broker.h        |  2 ++
 src/qpid/broker/BrokerOptions.h |  1 +
 src/qpid/broker/Queue.cpp       | 26 +++++++++++++++-----------
 src/qpid/broker/Queue.h         |  1 +
 5 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index d214994..48f9675 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -151,7 +151,8 @@ BrokerOptions::BrokerOptions(const std::string& name) :
     dtxDefaultTimeout(60),      // 60s
     dtxMaxTimeout(3600),        // 3600s
     maxNegotiateTime(10000),    // 10s
-    sessionMaxUnacked(5000) 
+    sessionMaxUnacked(5000),
+    maxPurgeBatch(1000)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -180,6 +181,7 @@ BrokerOptions::BrokerOptions(const std::string& name) :
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), 
"Management Publish Interval")
         ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
          "Interval between attempts to purge any expired messages from queues")
+        ("max-purge-batch", optValue(maxPurgeBatch, "MESSAGES"), "maximum 
number of expired messages queue clenear will purge in one batch (controls 
impact on other threads)")
         ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled 
all incoming connections will be trusted")
         ("realm", optValue(realm, "REALM"), "Use the given realm when 
performing authentication")
         ("sasl-service-name", optValue(saslServiceName, "NAME"), "The service 
name to specify for SASL")
@@ -248,6 +250,7 @@ Broker::Broker(const BrokerOptions& conf) :
     recoveryInProgress(false),
     protocolRegistry(std::set<std::string>(conf.protocols.begin(), 
conf.protocols.end()), this),
     timestampRcvMsgs(conf.timestampRcvMsgs),
+    maxPurgeBatch(conf.maxPurgeBatch),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if (!dataDir.isEnabled()) {
@@ -1694,5 +1697,11 @@ void Broker::setLinkClientProperties(const 
framing::FieldTable& ft) {
     linkClientProperties = ft;
 }
 
+uint32_t Broker::getMaxPurgeBatch() const
+{
+    return maxPurgeBatch;
+}
+
+
 }} // namespace qpid::broker
 
diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h
index 6d5514a..629fc0f 100644
--- a/src/qpid/broker/Broker.h
+++ b/src/qpid/broker/Broker.h
@@ -168,6 +168,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
     mutable sys::Mutex linkClientPropertiesLock;
     framing::FieldTable linkClientProperties;
     bool timestampRcvMsgs;
+    const uint32_t maxPurgeBatch;
 
   public:
     QPID_BROKER_EXTERN virtual ~Broker();
@@ -347,6 +348,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
     uint32_t getDtxMaxTimeout() const;
     uint16_t getQueueThresholdEventRatio() const;
     uint getQueueLimit() const;
+    uint32_t getMaxPurgeBatch() const;
 
     /** Information identifying this system */
     boost::shared_ptr<const System> getSystem() const { return systemObject; }
diff --git a/src/qpid/broker/BrokerOptions.h b/src/qpid/broker/BrokerOptions.h
index 0ef25fb..f4f71d8 100644
--- a/src/qpid/broker/BrokerOptions.h
+++ b/src/qpid/broker/BrokerOptions.h
@@ -79,6 +79,7 @@ struct BrokerOptions : public qpid::Options
     uint32_t maxNegotiateTime;  // Max time in ms for connection with no 
negotiation
     size_t sessionMaxUnacked;   // Max un-acknowledged outgoing messages per 
session
     std::string fedTag;
+    uint32_t maxPurgeBatch;
 
 private:
     std::string getHome();
diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp
index 3b54661..bda8fce 100644
--- a/src/qpid/broker/Queue.cpp
+++ b/src/qpid/broker/Queue.cpp
@@ -205,7 +205,8 @@ Queue::Queue(const string& _name, const QueueSettings& 
_settings,
     deleted(false),
     barrier(*this),
     allocator(new FifoDistributor( *messages )),
-    redirectSource(false)
+    redirectSource(false),
+    maxPurgeBatch(broker ? broker->getMaxPurgeBatch() : 1000)
 {
     current.setCount(0);//always track depth in messages
     if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes 
only if policy requires it
@@ -664,17 +665,20 @@ void Queue::purgeExpired(sys::Duration lapse) {
     int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
     if (seconds == 0 || count / seconds < 1) {
         sys::AbsTime time = sys::AbsTime::now();
-        uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, 
CONSUMER, settings.autodelete);
-        QPID_LOG(debug, "Purged " << count << " expired messages from " << 
getName());
-        //
-        // Report the count of discarded-by-ttl messages
-        //
-        if (mgmtObject && count) {
-            mgmtObject->inc_discardsTtl(count);
-            if (brokerMgmtObject) {
-                brokerMgmtObject->inc_discardsTtl(count);
+        uint32_t removed;
+        do {
+            removed = remove(maxPurgeBatch, boost::bind(&isExpired, name, _1, 
time), 0, CONSUMER, settings.autodelete);
+            QPID_LOG(debug, "Purged " << removed << " expired messages from " 
<< getName());
+            //
+            // Report the count of discarded-by-ttl messages
+            //
+            if (mgmtObject && removed) {
+                mgmtObject->inc_discardsTtl(removed);
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_discardsTtl(removed);
+                }
             }
-        }
+        } while(maxPurgeBatch && removed == maxPurgeBatch);//if we hit the 
limit, there may be more to purge
     }
 }
 
diff --git a/src/qpid/broker/Queue.h b/src/qpid/broker/Queue.h
index 941af4d..e78b336 100644
--- a/src/qpid/broker/Queue.h
+++ b/src/qpid/broker/Queue.h
@@ -226,6 +226,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
     // Redirect source and target refer to each other. Only one is source.
     Queue::shared_ptr redirectPeer;
     bool redirectSource;
+    const uint32_t maxPurgeBatch;
 
     bool checkAutoDelete(const qpid::sys::Mutex::ScopedLock&) const;
     bool isUnused(const qpid::sys::Mutex::ScopedLock&) const;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to