Author: aconway Date: Wed Jul 29 23:34:57 2009 New Revision: 799124 URL: http://svn.apache.org/viewvc?rev=799124&view=rev Log: Provide more informative cluster logging at notice level
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=799124&r1=799123&r2=799124&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul 29 23:34:57 2009 @@ -228,7 +228,8 @@ void Cluster::initialize() { if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); + QPID_LOG(notice, *this << " member " << self << " joining " + << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -240,7 +241,7 @@ // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(debug, *this << " add local connection " << c->getId()); + QPID_LOG(info, *this << " new local connection " << c->getId()); localConnections.insert(c); assert(c->getId().getMember() == self); // Announce the connection to the cluster. @@ -250,7 +251,7 @@ // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(debug, *this << " add shadow connection " << c->getId()); + QPID_LOG(info, *this << " new shadow connection " << c->getId()); // Safe to use connections here because we're pre-catchup, either // discarding or stalled, so deliveredFrame is not processing any // connection events. @@ -267,7 +268,7 @@ // Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id, Lock&) { - QPID_LOG(debug, *this << " erasing connection " << id); + QPID_LOG(info, *this << " connection closed " << id); connections.erase(id); decoder.erase(id); } @@ -357,7 +358,7 @@ // This preserves the connection decoder fragments for an update. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer) { - QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId() + QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId() << " to " << MemberId(offer->getUpdatee())); deliverEventQueue.stop(); } @@ -440,7 +441,7 @@ connection->deliveredFrame(e); } else - QPID_LOG(debug, *this << " DROP (no connection): " << e); + QPID_LOG(trace, *this << " DROP (no connection): " << e); } else // Drop connection frames while state < CATCHUP QPID_LOG(trace, *this << " DROP (joining): " << e); @@ -517,7 +518,7 @@ broker.setRecovery(nCurrent == 1); initialized = true; } - QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) + QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "left: ")); std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) @@ -553,7 +554,6 @@ } else { // Joining established group. state = JOINER; - QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); elders = map.getAlive(); elders.erase(self); @@ -603,7 +603,7 @@ memberUpdate(l); if (state == CATCHUP && id == self) { setReady(l); - QPID_LOG(notice, *this << " caught up, active cluster member"); + QPID_LOG(notice, *this << " caught up, active cluster member."); } } @@ -635,7 +635,7 @@ assert(state == JOINER); setClusterId(uuid, l); state = UPDATEE; - QPID_LOG(info, *this << " receiving update from " << updater); + QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); } else { @@ -662,7 +662,6 @@ if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. - QPID_LOG(info, *this << " retracting offer to " << updatee); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); @@ -679,7 +678,7 @@ if (state == LEFT) return; assert(state == OFFER); state = UPDATER; - QPID_LOG(info, *this << " sending update to " << updatee << " at " << url); + QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. updateThread = Thread( @@ -711,13 +710,13 @@ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; discarding = false; // ok to set, we're stalled for update. - QPID_LOG(info, *this << " received update, starting catch-up"); + QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; state = JOINER; - QPID_LOG(info, *this << " re-try joining after retracted update"); + QPID_LOG(notice, *this << " update retracted, sending new update request"); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); deliverEventQueue.start(); } @@ -729,7 +728,7 @@ } void Cluster::updateOutDone(Lock& l) { - QPID_LOG(info, *this << " sent update"); + QPID_LOG(notice, *this << " update sent"); assert(state == UPDATER); state = READY; mcast.release(); @@ -834,9 +833,9 @@ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); - o << cluster.self << "(" << STATE[cluster.state]; + o << "cluster:" << STATE[cluster.state]; if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error"; - return o << ")"; + return o; } MemberId Cluster::getId() const { @@ -863,7 +862,7 @@ mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } - QPID_LOG(debug, *this << " cluster-id = " << clusterId); + QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); } void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { @@ -875,7 +874,7 @@ // ErrorCheck class) then we have processed succesfully past the // point of the error. if (state >= CATCHUP && type != ERROR_TYPE_NONE) { - QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally."); + QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally."); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=799124&r1=799123&r2=799124&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jul 29 23:34:57 2009 @@ -439,13 +439,13 @@ void Connection::exchange(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); - QPID_LOG(debug, cluster << " decoded exchange " << ex->getName()); + QPID_LOG(debug, cluster << " updated exchange " << ex->getName()); } void Connection::queue(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf); - QPID_LOG(debug, cluster << " decoded queue " << q->getName()); + QPID_LOG(debug, cluster << " updated queue " << q->getName()); } void Connection::sessionError(uint16_t , const std::string& msg) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=799124&r1=799123&r2=799124&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Wed Jul 29 23:34:57 2009 @@ -57,7 +57,7 @@ QPID_LOG(error, cluster << (type == ERROR_TYPE_SESSION ? " channel" : " connection") << " error " << frameSeq << " on " << c << ": " << msg - << " (unresolved: " << unresolved << ")"); + << " must be resolved with: " << unresolved); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember()); // If there are already frames queued up by a previous error, review @@ -87,8 +87,8 @@ throw Exception("Aborted by failure that did not occur on all replicas"); } else { // his error is worse/same as mine. - QPID_LOG(debug, cluster << " error " << frameSeq - << " outcome agrees with " << i->getMemberId()); + QPID_LOG(notice, cluster << " error " << frameSeq + << " resolved with " << i->getMemberId()); unresolved.erase(i->getMemberId()); checkResolved(); } @@ -117,10 +117,11 @@ void ErrorCheck::checkResolved() { if (unresolved.empty()) { // No more potentially conflicted members, we're clear. type = ERROR_TYPE_NONE; - QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved."); + QPID_LOG(notice, cluster << " error " << frameSeq << " resolved."); } else - QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved); + QPID_LOG(notice, cluster << " error " << frameSeq + << " must be resolved with " << unresolved); } EventFrame ErrorCheck::getNext() { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=799124&r1=799123&r2=799124&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Wed Jul 29 23:34:57 2009 @@ -31,7 +31,6 @@ Quorum::~Quorum() { if (cman) cman_finish(cman); } void Quorum::init() { - QPID_LOG(info, "Waiting for cluster quorum"); enable = true; cman = cman_init(0); if (cman == 0) throw ErrnoException("Can't connect to cman service"); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=799124&r1=799123&r2=799124&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jul 29 23:34:57 2009 @@ -124,7 +124,8 @@ } void UpdateClient::update() { - QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl); + QPID_LOG(debug, updaterId << " updating state to " << updateeId + << " at " << updateeUrl); Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org