This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git
commit ea8e59de0e4f6dd3513549befee362c9af1f98ca Author: Gordon Sim <g...@redhat.com> AuthorDate: Mon Apr 1 12:32:56 2019 +0100 QPID-8293: limit the number of messages that can be purged in a single sweep --- src/qpid/broker/Broker.cpp | 11 ++++++++++- src/qpid/broker/Broker.h | 2 ++ src/qpid/broker/BrokerOptions.h | 1 + src/qpid/broker/Queue.cpp | 26 +++++++++++++++----------- src/qpid/broker/Queue.h | 1 + 5 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp index d214994..48f9675 100644 --- a/src/qpid/broker/Broker.cpp +++ b/src/qpid/broker/Broker.cpp @@ -151,7 +151,8 @@ BrokerOptions::BrokerOptions(const std::string& name) : dtxDefaultTimeout(60), // 60s dtxMaxTimeout(3600), // 3600s maxNegotiateTime(10000), // 10s - sessionMaxUnacked(5000) + sessionMaxUnacked(5000), + maxPurgeBatch(1000) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -180,6 +181,7 @@ BrokerOptions::BrokerOptions(const std::string& name) : ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") + ("max-purge-batch", optValue(maxPurgeBatch, "MESSAGES"), "maximum number of expired messages queue clenear will purge in one batch (controls impact on other threads)") ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") ("sasl-service-name", optValue(saslServiceName, "NAME"), "The service name to specify for SASL") @@ -248,6 +250,7 @@ Broker::Broker(const BrokerOptions& conf) : recoveryInProgress(false), protocolRegistry(std::set<std::string>(conf.protocols.begin(), conf.protocols.end()), this), timestampRcvMsgs(conf.timestampRcvMsgs), + maxPurgeBatch(conf.maxPurgeBatch), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (!dataDir.isEnabled()) { @@ -1694,5 +1697,11 @@ void Broker::setLinkClientProperties(const framing::FieldTable& ft) { linkClientProperties = ft; } +uint32_t Broker::getMaxPurgeBatch() const +{ + return maxPurgeBatch; +} + + }} // namespace qpid::broker diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h index 6d5514a..629fc0f 100644 --- a/src/qpid/broker/Broker.h +++ b/src/qpid/broker/Broker.h @@ -168,6 +168,7 @@ class Broker : public sys::Runnable, public Plugin::Target, mutable sys::Mutex linkClientPropertiesLock; framing::FieldTable linkClientProperties; bool timestampRcvMsgs; + const uint32_t maxPurgeBatch; public: QPID_BROKER_EXTERN virtual ~Broker(); @@ -347,6 +348,7 @@ class Broker : public sys::Runnable, public Plugin::Target, uint32_t getDtxMaxTimeout() const; uint16_t getQueueThresholdEventRatio() const; uint getQueueLimit() const; + uint32_t getMaxPurgeBatch() const; /** Information identifying this system */ boost::shared_ptr<const System> getSystem() const { return systemObject; } diff --git a/src/qpid/broker/BrokerOptions.h b/src/qpid/broker/BrokerOptions.h index 0ef25fb..f4f71d8 100644 --- a/src/qpid/broker/BrokerOptions.h +++ b/src/qpid/broker/BrokerOptions.h @@ -79,6 +79,7 @@ struct BrokerOptions : public qpid::Options uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation size_t sessionMaxUnacked; // Max un-acknowledged outgoing messages per session std::string fedTag; + uint32_t maxPurgeBatch; private: std::string getHome(); diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp index 3b54661..bda8fce 100644 --- a/src/qpid/broker/Queue.cpp +++ b/src/qpid/broker/Queue.cpp @@ -205,7 +205,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, deleted(false), barrier(*this), allocator(new FifoDistributor( *messages )), - redirectSource(false) + redirectSource(false), + maxPurgeBatch(broker ? broker->getMaxPurgeBatch() : 1000) { current.setCount(0);//always track depth in messages if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it @@ -664,17 +665,20 @@ void Queue::purgeExpired(sys::Duration lapse) { int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { sys::AbsTime time = sys::AbsTime::now(); - uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete); - QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); - // - // Report the count of discarded-by-ttl messages - // - if (mgmtObject && count) { - mgmtObject->inc_discardsTtl(count); - if (brokerMgmtObject) { - brokerMgmtObject->inc_discardsTtl(count); + uint32_t removed; + do { + removed = remove(maxPurgeBatch, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete); + QPID_LOG(debug, "Purged " << removed << " expired messages from " << getName()); + // + // Report the count of discarded-by-ttl messages + // + if (mgmtObject && removed) { + mgmtObject->inc_discardsTtl(removed); + if (brokerMgmtObject) { + brokerMgmtObject->inc_discardsTtl(removed); + } } - } + } while(maxPurgeBatch && removed == maxPurgeBatch);//if we hit the limit, there may be more to purge } } diff --git a/src/qpid/broker/Queue.h b/src/qpid/broker/Queue.h index 941af4d..e78b336 100644 --- a/src/qpid/broker/Queue.h +++ b/src/qpid/broker/Queue.h @@ -226,6 +226,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, // Redirect source and target refer to each other. Only one is source. Queue::shared_ptr redirectPeer; bool redirectSource; + const uint32_t maxPurgeBatch; bool checkAutoDelete(const qpid::sys::Mutex::ScopedLock&) const; bool isUnused(const qpid::sys::Mutex::ScopedLock&) const; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org