Author: aconway Date: Thu Aug 6 17:41:18 2009 New Revision: 801740 URL: http://svn.apache.org/viewvc?rev=801740&view=rev Log: Fix cman integration to exit immediately on loss of quorum.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug 6 17:41:18 2009 @@ -190,6 +190,7 @@ boost::bind(&Cluster::leave, this), "Error delivering frames", poller), + quorum(boost::bind(&Cluster::leave, this)), initialized(false), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -214,7 +215,6 @@ // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -226,10 +226,10 @@ } void Cluster::initialize() { + if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - QPID_LOG(notice, *this << " member " << self << " joining " - << name << " with url=" << myUrl); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -404,6 +404,7 @@ LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); + if (state == LEFT) return; EventFrame e(efConst); const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); if (offer && error.isUnresolved()) { @@ -510,7 +511,7 @@ const cpg_name */*group*/, const cpg_address *current, int nCurrent, const cpg_address *left, int nLeft, - const cpg_address */*joined*/, int /*nJoined*/) + const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); if (state == INIT) { // First config change. @@ -518,8 +519,11 @@ broker.setRecovery(nCurrent == 1); initialized = true; } - QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent) - << AddrList(left, nLeft, "left: ")); + QPID_LOG(notice, *this << " membership change: " + << AddrList(current, nCurrent) << "(" + << AddrList(joined, nJoined, "joined: ") + << AddrList(left, nLeft, "left: ") + << ")"); std::string addresses; for (const cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); @@ -833,9 +837,9 @@ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); - o << "cluster:" << STATE[cluster.state]; + o << "cluster(" << cluster.self << " " << STATE[cluster.state]; if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error"; - return o; + return o << ")";; } MemberId Cluster::getId() const { @@ -846,14 +850,6 @@ return broker; // Immutable, no need to lock. } -void Cluster::checkQuorum() { - if (!quorum.isQuorate()) { - QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down"); - leave(); - throw Exception(QPID_MSG(*this << " disconnected from cluster quorum.")); - } -} - void Cluster::setClusterId(const Uuid& uuid, Lock&) { clusterId = uuid; if (mgmtObject) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Aug 6 17:41:18 2009 @@ -101,8 +101,6 @@ broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } - void checkQuorum(); - const ClusterSettings& getSettings() const { return settings; } void deliverFrame(const EventFrame&); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Aug 6 17:41:18 2009 @@ -72,7 +72,7 @@ ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") - // FIXME aconway 2009-05-20: temporary + // TODO aconway 2009-05-20: temporary, remove ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.") ; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Thu Aug 6 17:41:18 2009 @@ -37,7 +37,7 @@ bool checkErrors; ClusterSettings() : quorum(false), readMax(10), - checkErrors(true) // FIXME aconway 2009-05-20: temporary + checkErrors(true) // TODO aconway 2009-05-20: remove this option. {} Url getUrl(uint16_t port) const { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Aug 6 17:41:18 2009 @@ -105,7 +105,7 @@ QPID_LOG(info, "Initializing CPG"); cpg_error_t err = cpg_initialize(&handle, &callbacks); - int retries = 6; + int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection. while (err == CPG_ERR_TRY_AGAIN && --retries) { QPID_LOG(notice, "Re-trying CPG initialization."); sys::sleep(5); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Thu Aug 6 17:41:18 2009 @@ -20,7 +20,6 @@ */ #include "qpid/Exception.h" -#include "qpid/cluster/Dispatchable.h" #include "qpid/cluster/types.h" #include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Aug 6 17:41:18 2009 @@ -46,7 +46,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) { LATENCY_TRACK(doOutputTracker.finish(f.getBody())); - parent.getCluster().checkQuorum(); { sys::Mutex::ScopedLock l(lock); next->send(f); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Thu Aug 6 17:41:18 2009 @@ -33,15 +33,18 @@ boost::bind(&PollerDispatch::dispatch, this, _1), // read 0, // write boost::bind(&PollerDispatch::disconnect, this, _1) // disconnect - ) + ), + started(false) {} PollerDispatch::~PollerDispatch() { - dispatchHandle.stopWatch(); + if (started) + dispatchHandle.stopWatch(); } void PollerDispatch::start() { dispatchHandle.startWatch(poller); + started = true; } // Entry point: called by IO to dispatch CPG events. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h Thu Aug 6 17:41:18 2009 @@ -51,6 +51,7 @@ boost::shared_ptr<sys::Poller> poller; boost::function<void()> onError; sys::DispatchHandleRef dispatchHandle; + bool started; }; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Thu Aug 6 17:41:18 2009 @@ -18,28 +18,86 @@ * under the License. * */ + #include "qpid/cluster/Quorum_cman.h" +#include "qpid/cluster/Cluster.h" #include "qpid/log/Statement.h" #include "qpid/Options.h" #include "qpid/sys/Time.h" +#include "qpid/sys/posix/PrivatePosix.h" namespace qpid { namespace cluster { -Quorum::Quorum() : enable(false), cman(0) {} +namespace { + +boost::function<void()> errorFn; -Quorum::~Quorum() { if (cman) cman_finish(cman); } +void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) { + if (reason == CMAN_REASON_STATECHANGE && arg == 0) { + QPID_LOG(critical, "Lost contact with cluster quorum."); + if (errorFn) errorFn(); + cman_stop_notification(handle); + } +} +} + +Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) { + errorFn = err; +} + +Quorum::~Quorum() { + dispatchHandle.reset(); + if (cman) cman_finish(cman); +} -void Quorum::init() { +void Quorum::start(boost::shared_ptr<sys::Poller> p) { + poller = p; enable = true; + QPID_LOG(debug, "Connecting to quorum service."); cman = cman_init(0); if (cman == 0) throw ErrnoException("Can't connect to cman service"); if (!cman_is_quorate(cman)) { QPID_LOG(notice, "Waiting for cluster quorum."); while(!cman_is_quorate(cman)) sys::sleep(5); } + int err = cman_start_notification(cman, cmanCallbackFn); + if (err != 0) throw ErrnoException("Can't register for cman notifications"); + watch(getFd()); } -bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; } +void Quorum::watch(int fd) { + cmanFd = fd; + dispatchHandle.reset( + new sys::DispatchHandleRef( + sys::PosixIOHandle(cmanFd), + boost::bind(&Quorum::dispatch, this, _1), // read + 0, // write + boost::bind(&Quorum::disconnect, this, _1) // disconnect + )); + dispatchHandle->startWatch(poller); +} + +int Quorum::getFd() { + int fd = cman_get_fd(cman); + if (fd == 0) throw ErrnoException("Can't get cman file descriptor"); + return fd; +} + +void Quorum::dispatch(sys::DispatchHandle&) { + try { + cman_dispatch(cman, CMAN_DISPATCH_ALL); + int fd = getFd(); + if (fd != cmanFd) watch(fd); + } catch (const std::exception& e) { + QPID_LOG(critical, "Error in quorum dispatch: " << e.what()); + errorFn(); + } +} + +void Quorum::disconnect(sys::DispatchHandle&) { + QPID_LOG(critical, "Disconnected from quorum service"); + errorFn(); +} }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.h Thu Aug 6 17:41:18 2009 @@ -22,26 +22,40 @@ * */ +#include <qpid/sys/DispatchHandle.h> +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <memory> + extern "C" { #include <libcman.h> } namespace qpid { - -class Options; +namespace sys { +class Poller; +} namespace cluster { +class Cluster; class Quorum { public: - Quorum(); + Quorum(boost::function<void ()> onError); ~Quorum(); - void init(); - bool isQuorate(); + void start(boost::shared_ptr<sys::Poller>); private: + void dispatch(sys::DispatchHandle&); + void disconnect(sys::DispatchHandle&); + int getFd(); + void watch(int fd); + bool enable; cman_handle_t cman; + int cmanFd; + std::auto_ptr<sys::DispatchHandleRef> dispatchHandle; + boost::shared_ptr<sys::Poller> poller; }; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h?rev=801740&r1=801739&r2=801740&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_null.h Thu Aug 6 17:41:18 2009 @@ -21,15 +21,20 @@ * under the License. * */ + +#include <boost/shared_ptr.hpp> +#include <boost/function.hpp> + namespace qpid { namespace cluster { +class Cluster; /** Null implementation of quorum. */ class Quorum { public: - void init() {} - bool isQuorate() { return true; } + Quorum(boost::function<void ()>) {} + void start(boost::shared_ptr<sys::Poller>) {} }; #endif /*!QPID_CLUSTER_QUORUM_NULL_H*/ --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org