Author: aconway Date: Fri Jul 10 15:42:36 2009 New Revision: 792991 URL: http://svn.apache.org/viewvc?rev=792991&view=rev Log: Fix cluster handling of multiple errors.
If an error occured while there were frames on the error queue from a previous error, the enqueued frames were not being processed for the new error, which could lead to error-check or config-change frames being missed. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=792991&r1=792990&r2=792991&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 10 15:42:36 2009 @@ -440,7 +440,7 @@ connection->deliveredFrame(e); } else - QPID_LOG(critical, *this << " FIXME DROP (no connection): " << e); + QPID_LOG(debug, *this << " DROP (no connection): " << e); } else // Drop connection frames while state < CATCHUP QPID_LOG(trace, *this << " DROP (joining): " << e); @@ -534,6 +534,7 @@ void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { bool memberChange = map.configChange(current); + QPID_LOG(debug, *this << " applied config change: " << map); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=792991&r1=792990&r2=792991&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Fri Jul 10 15:42:36 2009 @@ -39,7 +39,7 @@ : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) {} -ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { +ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) { copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); return o; } @@ -60,45 +60,58 @@ << " (unresolved: " << unresolved << ")"); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember()); + // If there are already frames queued up by a previous error, review + // them with respect to this new error. + for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i)) + ; } void ErrorCheck::delivered(const EventFrame& e) { + FrameQueue::iterator i = frames.insert(frames.end(), e); + review(i); +} + +// Review a frame in the queue with respect to the current error. +ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) { + FrameQueue::iterator next = i+1; if (isUnresolved()) { const ClusterErrorCheckBody* errorCheck = 0; - if (e.frame.getBody()) + if (i->frame.getBody()) errorCheck = dynamic_cast<const ClusterErrorCheckBody*>( - e.frame.getMethod()); + i->frame.getMethod()); if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + next = frames.erase(i); // Drop matching error check controls if (errorCheck->getType() < type) { // my error is worse than his QPID_LOG(critical, cluster << " error " << frameSeq - << " did not occur on " << e.getMemberId()); + << " did not occur on " << i->getMemberId()); throw Exception("Aborted by local failure that did not occur on all replicas"); } else { // his error is worse/same as mine. QPID_LOG(debug, cluster << " error " << frameSeq - << " outcome agrees with " << e.getMemberId()); - unresolved.erase(e.getMemberId()); + << " outcome agrees with " << i->getMemberId()); + unresolved.erase(i->getMemberId()); checkResolved(); } } else { - frames.push_back(e); // Only drop matching errorCheck controls. const ClusterConfigChangeBody* configChange = 0; - if (e.frame.getBody()) - configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + if (i->frame.getBody()) + configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod()); if (configChange) { MemberSet members(ClusterMap::decode(configChange->getCurrent())); - MemberSet result; + QPID_LOG(debug, cluster << " apply config change to unresolved: " + << members); + + MemberSet intersect; set_intersection(members.begin(), members.end(), unresolved.begin(), unresolved.end(), - inserter(result, result.begin())); - unresolved.swap(result); + inserter(intersect, intersect.begin())); + unresolved.swap(intersect); checkResolved(); } } } - else - frames.push_back(e); + return next; } void ErrorCheck::checkResolved() { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=792991&r1=792990&r2=792991&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Fri Jul 10 15:42:36 2009 @@ -41,8 +41,8 @@ /** * Error checking logic. * - * When an error occurs stop processing frames and queue them until we - * can determine if all nodes experienced the error. If not, we shut down. + * When an error occurs queue up frames until we can determine if all + * nodes experienced the error. If not, we shut down. */ class ErrorCheck { @@ -59,18 +59,22 @@ /** Called when a frame is delivered */ void delivered(const EventFrame&); + /*...@pre canProcess **/ EventFrame getNext(); bool canProcess() const; + bool isUnresolved() const; private: + typedef std::deque<EventFrame> FrameQueue; + FrameQueue::iterator review(const FrameQueue::iterator&); void checkResolved(); Cluster& cluster; Multicaster& mcast; - std::deque<EventFrame> frames; - std::set<MemberId> unresolved; + FrameQueue frames; + MemberSet unresolved; uint64_t frameSeq; ErrorType type; Connection* connection; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org