Author: aconway Date: Mon May 25 18:20:50 2009 New Revision: 778464 URL: http://svn.apache.org/viewvc?rev=778464&view=rev Log: PollableQueue optimization - replace deque with vector.
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp Mon May 25 18:20:50 2009 @@ -66,15 +66,14 @@ } } -void QueueEvents::handle(EventQueue::Queue& events) -{ +QueueEvents::EventQueue::Batch::const_iterator +QueueEvents::handle(const EventQueue::Batch& events) { qpid::sys::Mutex::ScopedLock l(lock); - while (!events.empty()) { - for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { - i->second(events.front()); - } - events.pop_front(); + for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(*i); } + return events.end(); } void QueueEvents::shutdown() Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h Mon May 25 18:20:50 2009 @@ -74,7 +74,7 @@ volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access - void handle(EventQueue::Queue& e); + EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e); }; }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon May 25 18:20:50 2009 @@ -61,8 +61,6 @@ { public: - typedef sys::PollableQueue<EventFrame> PollableFrameQueue; - /** Local connection. */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); /** Shadow connection. */ Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Mon May 25 18:20:50 2009 @@ -87,7 +87,7 @@ return control(framing::AMQFrame(body), cid); } -iovec Event::toIovec() { +iovec Event::toIovec() const { encodeHeader(); iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; return iov; @@ -103,8 +103,8 @@ } // Encode my header in my buffer. -void Event::encodeHeader () { - Buffer b(getStore(), HEADER_SIZE); +void Event::encodeHeader () const { + Buffer b(const_cast<char*>(getStore()), HEADER_SIZE); encode(b); assert(b.getPosition() == HEADER_SIZE); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Mon May 25 18:20:50 2009 @@ -54,7 +54,7 @@ /** Size of payload data, excluding header. */ size_t getSize() const { return size; } /** Size of header + payload. */ - size_t getStoreSize() { return size + HEADER_SIZE; } + size_t getStoreSize() const { return size + HEADER_SIZE; } bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } @@ -99,10 +99,10 @@ operator framing::Buffer() const; - iovec toIovec(); + iovec toIovec() const; private: - void encodeHeader(); + void encodeHeader() const; RefCountedBuffer::pointer store; }; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Mon May 25 18:20:50 2009 @@ -71,9 +71,9 @@ } -void Multicaster::sendMcast(PollableEventQueue::Queue& values) { +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { try { - PollableEventQueue::Queue::iterator i = values.begin(); + PollableEventQueue::Batch::const_iterator i = values.begin(); while( i != values.end()) { iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { @@ -82,12 +82,13 @@ } ++i; } - values.erase(values.begin(), i); // Erase sent events. + return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); + return values.end(); } } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Mon May 25 18:20:50 2009 @@ -28,6 +28,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> +#include <deque> namespace qpid { @@ -63,7 +64,7 @@ typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; - void sendMcast(PollableEventQueue::Queue& ); + PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& ); sys::Mutex lock; boost::function<void()> onError; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Mon May 25 18:20:50 2009 @@ -37,24 +37,27 @@ typedef boost::function<void (const T&)> Callback; typedef boost::function<void()> ErrorCallback; - PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller) - : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller), + PollableQueue(Callback f, ErrorCallback err, const std::string& msg, + const boost::shared_ptr<sys::Poller>& poller) + : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), + poller), callback(f), error(err), message(msg) {} - void handleBatch(typename sys::PollableQueue<T>::Queue& values) { + typename sys::PollableQueue<T>::Batch::const_iterator + handleBatch(const typename sys::PollableQueue<T>::Batch& values) { try { - typename sys::PollableQueue<T>::Queue::iterator i = values.begin(); + typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin(); while (i != values.end() && !this->isStopped()) { callback(*i); ++i; } - values.erase(values.begin(), i); + return i; } catch (const std::exception& e) { QPID_LOG(error, message << ": " << e.what()); - values.clear(); this->stop(); error(); + return values.end(); } } Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=778464&r1=778463&r2=778464&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Mon May 25 18:20:50 2009 @@ -28,7 +28,7 @@ #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> -#include <deque> +#include <vector> namespace qpid { namespace sys { @@ -44,16 +44,18 @@ template <class T> class PollableQueue { public: - typedef std::deque<T> Queue; + typedef std::vector<T> Batch; typedef T value_type; /** * Callback to process a batch of items from the queue. * - * @param values Queue of values to process. Any items remaining + * @param batch Queue of values to process. Any items remaining * on return from Callback are put back on the queue. + * @return iterator pointing to the first un-processed item in batch. + * Items from this point up to batch.end() are put back on the queue. */ - typedef boost::function<void (Queue&)> Callback; + typedef boost::function<typename Batch::const_iterator (const Batch& batch)> Callback; /** * Constructor; sets necessary parameters. @@ -99,7 +101,7 @@ mutable sys::Monitor lock; Callback callback; PollableCondition condition; - Queue queue, batch; + Batch queue, batch; Thread dispatcher; bool stopped; }; @@ -141,17 +143,18 @@ } template <class T> void PollableQueue<T>::process() { + // Called with lock held while (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); + typename Batch::const_iterator putBack; { ScopedUnlock u(lock); // Allow concurrent push to queue. - callback(batch); - } - if (!batch.empty()) { - queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items. - batch.clear(); + putBack = callback(batch); } + // put back unprocessed items. + queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); + batch.clear(); } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org