Author: aconway Date: Tue May 26 21:41:52 2009 New Revision: 778896 URL: http://svn.apache.org/viewvc?rev=778896&view=rev Log: Improved doOutput algorithm.
Simpler & more robust algorithm based on message count rather than byte size. Self-tuning, removes 2 hard-to-explain cluster options. Similar or marginally better performance. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue May 26 21:41:52 2009 @@ -70,13 +70,8 @@ #if HAVE_LIBCMAN_H ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif - ("cluster-read-max", optValue(settings.readMax,"N"), - "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.") - ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"), - "Experimental: initial estimate for write rate per multicast cycle") - ("cluster-write-min", optValue(settings.writeMin, "Kb"), - "Experimental: minimum estimate for write rate per multicast cycle") - // FIXME aconway 2009-05-20: temporary + ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") + // FIXME aconway 2009-05-20: temporary ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.") ; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue May 26 21:41:52 2009 @@ -32,11 +32,11 @@ std::string name; std::string url; bool quorum; - size_t readMax, writeEstimate, writeMin; + size_t readMax; std::string username, password, mechanism; bool checkErrors; - ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1), + ClusterSettings() : quorum(false), readMax(10), checkErrors(true) // FIXME aconway 2009-05-20: temporary {} Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue May 26 21:41:52 2009 @@ -113,14 +113,6 @@ return output.doOutput(); } -// Delivery of doOutput allows us to run the real connection doOutput() -// which stocks up the write buffers with data. -// -void Connection::deliverDoOutput(uint32_t requested) { - assert(!catchUp); - output.deliverDoOutput(requested); -} - // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); @@ -279,7 +271,7 @@ QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { ConnectionId shadowId = ConnectionId(memberId, connectionId); QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; @@ -287,6 +279,7 @@ // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); connection.setErrorListener(this); + output.setSendMax(sendMax); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue May 26 21:41:52 2009 @@ -115,7 +115,7 @@ const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); @@ -150,6 +150,8 @@ void deliverClose(); + OutputInterceptor& getOutput() { return output; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -164,8 +166,7 @@ void init(); bool checkUnsupported(const framing::AMQBody& body); - void deliverDoOutput(uint32_t requested); - void sendDoOutput(); + void deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); } boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); broker::SessionState& sessionState(); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue May 26 21:41:52 2009 @@ -32,17 +32,17 @@ namespace cluster { using namespace framing; +using namespace std; NoOpConnectionOutputHandler OutputInterceptor::discardHandler; OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), closing(false), next(&h), sent(), - estimate(p.getCluster().getSettings().writeEstimate*1024), - minimum(p.getCluster().getSettings().writeMin*1024), - moreOutput(), doingOutput() + : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false) {} -LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;) +#if defined QPID_LATENCY_TRACKER +extern sys::LatencyTracker<const AMQBody*> doOutputTracker; +#endif void OutputInterceptor::send(framing::AMQFrame& f) { LATENCY_TRACK(doOutputTracker.finish(f.getBody())); @@ -53,8 +53,6 @@ sys::Mutex::ScopedLock l(lock); next->send(f); } - if (!parent.isCatchUp()) - sent += f.encodedSize(); } void OutputInterceptor::activateOutput() { @@ -62,11 +60,8 @@ sys::Mutex::ScopedLock l(lock); next->activateOutput(); } - else if (!closing) { // Don't send do ouput after output stopped. - QPID_LOG(trace, parent << " activateOutput - sending doOutput"); - moreOutput = true; - sendDoOutput(estimate); - } + else + sendDoOutput(sendMax); } void OutputInterceptor::giveReadCredit(int32_t credit) { @@ -77,43 +72,33 @@ // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { return false; } - -// Delivery of doOutput allows us to run the real connection doOutput() -// which tranfers frames to the codec for writing. -// -void OutputInterceptor::deliverDoOutput(size_t requested) { - size_t buf = getBuffered(); - if (parent.isLocal()) { // Adjust estimate for next sendDoOutput - sent = sent > buf ? sent - buf : 0; // Buffered data was not sent. - if (buf > 0) // Wrote to capacity, move estimate towards sent. - estimate = (estimate + sent) /2; - else if (sent >= estimate) // Last estimate was too small, increase it. - estimate *= 2; - if (estimate < minimum) estimate = minimum; - } - // Run the real doOutput() till we have added the requested data - // or there's nothing to output. Record how much we send. - sent = 0; - do { - moreOutput = parent.getBrokerConnection().doOutput(); - } while (sent < requested && moreOutput); - sent += buf; // Include data previously in the buffer +bool OutputInterceptor::doOutput() { return false; } +// Send output up to limit, calculate new limit. +void OutputInterceptor::deliverDoOutput(uint32_t limit) { + sentDoOutput = false; + sendMax = limit; + size_t newLimit = limit; if (parent.isLocal()) { - // Send the next doOutput request - doingOutput = false; - sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer? + size_t buffered = getBuffered(); + if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. + newLimit = sendMax*2; + else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. + newLimit = sent-1; } + sent = 0; + while (sent < limit && parent.getBrokerConnection().doOutput()) + ++sent; + if (sent == limit) sendDoOutput(newLimit); } -// Send a doOutput request if one is not already in flight. -void OutputInterceptor::sendDoOutput(size_t request) { - if (!parent.isLocal() || doingOutput || !moreOutput) return; - doingOutput = true; - parent.getCluster().getMulticast().mcastControl( - ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId()); - QPID_LOG(trace, parent << "Send doOutput request for " << request); +void OutputInterceptor::sendDoOutput(size_t newLimit) { + if (parent.isLocal() && !sentDoOutput && !closing) { + sentDoOutput = true; + parent.getCluster().getMulticast().mcastControl( + ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit), + parent.getId()); + } } void OutputInterceptor::closeOutput() { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Tue May 26 21:41:52 2009 @@ -49,28 +49,28 @@ size_t getBuffered() const; // Delivery point for doOutput requests. - void deliverDoOutput(size_t requested); + void deliverDoOutput(uint32_t limit); // Intercept doOutput requests on Connection. bool doOutput(); void closeOutput(); + uint32_t getSendMax() const { return sendMax; } + void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; } + cluster::Connection& parent; private: typedef sys::Mutex::ScopedLock Locker; - void sendDoOutput(size_t); + void sendDoOutput(size_t newLimit); mutable sys::Mutex lock; bool closing; sys::ConnectionOutputHandler* next; - size_t sent; - size_t estimate; - size_t minimum; - bool moreOutput; - bool doingOutput; static NoOpConnectionOutputHandler discardHandler; + uint32_t sendMax, sent; + bool sentDoOutput; }; }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue May 26 21:41:52 2009 @@ -41,7 +41,8 @@ const boost::shared_ptr<sys::Poller>& poller) : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller), - callback(f), error(err), message(msg) {} + callback(f), error(err), message(msg) + {} typename sys::PollableQueue<T>::Batch::const_iterator handleBatch(const typename sys::PollableQueue<T>::Batch& values) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue May 26 21:41:52 2009 @@ -253,7 +253,8 @@ updateConnection->getId().getMember(), updateConnection->getId().getNumber(), bc.getUserId(), - string(fragment.first, fragment.second) + string(fragment.first, fragment.second), + updateConnection->getOutput().getSendMax() ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=778896&r1=778895&r2=778896&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue May 26 21:41:52 2009 @@ -74,7 +74,7 @@ <control name="deliver-close" code="0x2"/> <control name="deliver-do-output" code="0x3"> - <field name="bytes" type="uint32"/> + <field name="limit" type="uint32"/> </control> <!-- Update controls. Sent to a new broker in joining mode. @@ -139,6 +139,7 @@ <field name="connection-id" type="uint64"/> <field name="user-name" type="str8"/> <field name="fragment" type="str32"/> + <field name="send-max" type="uint32"/> </control> <!-- Complete a cluster state update. --> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org