Author: gsim Date: Thu Jan 22 14:53:50 2009 New Revision: 736841 URL: http://svn.apache.org/viewvc?rev=736841&view=rev Log: QPID-1567: More changes to make clustering and federation work together
* replicate outgoing link traffic to all nodes * coordinate amongst nodes so that only one node actually maintains active links with the others able to take over if that node fails Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Link.h qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 22 14:53:50 2009 @@ -72,6 +72,7 @@ if (!args.i_durable) agent->addObject(mgmtObject); } + QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } Bridge::~Bridge() @@ -104,10 +105,11 @@ session->attach(name, false); session->commandPoint(0,0); - if (args.i_srcIsQueue) { + if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); } else { FieldTable queueSettings; @@ -141,6 +143,9 @@ if (exchange.get() == 0) throw Exception("Exchange not found for dynamic route"); exchange->registerDynamicBridge(this); + QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src); + } else { + QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest); } } } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Jan 22 14:53:50 2009 @@ -117,7 +117,7 @@ ChannelMap channels; //framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; - bool isLink; + const bool isLink; bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Jan 22 14:53:50 2009 @@ -106,6 +106,7 @@ case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; case STATE_FAILED : mgmtObject->set_state("Failed"); break; case STATE_CLOSED : mgmtObject->set_state("Closed"); break; + case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; } } @@ -239,6 +240,7 @@ if (state != STATE_OPERATIONAL) return; + QPID_LOG(debug, "Link::ioThreadProcessing()"); //process any pending creates if (!created.empty()) { @@ -404,6 +406,7 @@ case _qmf::Link::METHOD_BRIDGE : _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; + QPID_LOG(debug, "Link::bridge() request received"); // Durable bridges are only valid on durable links if (iargs.i_durable && !durable) { @@ -437,3 +440,17 @@ return Manageable::STATUS_UNKNOWN_METHOD; } + +void Link::setPassive(bool passive) +{ + Mutex::ScopedLock mutex(lock); + if (passive) { + setStateLH(STATE_PASSIVE); + } else { + if (state == STATE_PASSIVE) { + setStateLH(STATE_WAITING); + } else { + QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); + } + } +} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Thu Jan 22 14:53:50 2009 @@ -76,6 +76,7 @@ static const int STATE_OPERATIONAL = 3; static const int STATE_FAILED = 4; static const int STATE_CLOSED = 5; + static const int STATE_PASSIVE = 6; static const uint32_t MAX_INTERVAL = 32; @@ -120,6 +121,7 @@ Broker* getBroker() { return broker; } void notifyConnectionForced(const std::string text); + void setPassive(bool p); // PersistableConfig: void setPersistenceId(uint64_t id) const; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Jan 22 14:53:50 2009 @@ -31,7 +31,7 @@ #define LINK_MAINT_INTERVAL 2 -LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); } @@ -51,6 +51,14 @@ linksToDestroy.clear(); bridgesToDestroy.clear(); + if (passiveChanged) { + if (passive) { QPID_LOG(info, "Passivating links"); } + else { QPID_LOG(info, "Activating links"); } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { + i->second->setPassive(passive); + } + passiveChanged = false; + } for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); //now process any requests for re-addressing @@ -109,6 +117,7 @@ link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, authMechanism, username, password, broker, parent)); + if (passive) link->setPassive(true); links[key] = link; return std::pair<Link::shared_ptr, bool>(link, true); } @@ -129,6 +138,8 @@ uint16_t sync) { Mutex::ScopedLock locker(lock); + QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); + stringstream keystream; keystream << host << ":" << port; string linkKey = string(keystream.str()); @@ -291,3 +302,11 @@ keystream << a.host << ":" << a.port; return string(keystream.str()); } + +void LinkRegistry::setPassive(bool p) +{ + Mutex::ScopedLock locker(lock); + passiveChanged = p != passive; + passive = p; + //will activate or passivate links on maintenance visit +} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Thu Jan 22 14:53:50 2009 @@ -64,6 +64,8 @@ Timer timer; management::Manageable* parent; MessageStore* store; + bool passive; + bool passiveChanged; void periodicMaintenance (); bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress); @@ -122,7 +124,17 @@ std::string getAuthCredentials (const std::string& key); std::string getAuthIdentity (const std::string& key); + /** + * Called by links failing over to new address + */ void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress); + /** + * Called to alter passive state. In passive state the links + * and bridges managed by a link registry will be recorded and + * updated but links won't actually establish connections and + * bridges won't therefore pull or push any messages. + */ + void setPassive(bool); }; } } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jan 22 14:53:50 2009 @@ -323,10 +323,20 @@ state = NEWBIE; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); + ClusterMap::Set members = map.getAlive(); + members.erase(myId); + myElders = members; + broker.getLinks().setPassive(true); } } - else if (state >= READY && memberChange) + else if (state >= READY && memberChange) { memberUpdate(l); + myElders = ClusterMap::intersection(myElders, map.getAlive()); + if (myElders.empty()) { + //assume we are oldest, reactive links if necessary + broker.getLinks().setPassive(false); + } + } } @@ -496,6 +506,8 @@ } lastSize = size; + // + if (mgmtObject) { mgmtObject->set_clusterSize(size); string urlstr; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jan 22 14:53:50 2009 @@ -184,6 +184,7 @@ const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; + ClusterMap::Set myElders; // Thread safe members Multicaster mcast; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Thu Jan 22 14:53:50 2009 @@ -114,6 +114,10 @@ return urls; } +ClusterMap::Set ClusterMap::getAlive() const { + return alive; +} + std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { std::ostream_iterator<MemberId> oi(o); std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1)); @@ -170,4 +174,13 @@ return boost::optional<Url>(); } +ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b) +{ + Set intersection; + std::set_intersection(a.begin(), a.end(), + b.begin(), b.end(), + std::inserter(intersection, intersection.begin())); + return intersection; + +} }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Thu Jan 22 14:53:50 2009 @@ -76,6 +76,7 @@ size_t aliveCount() const { return alive.size(); } size_t memberCount() const { return members.size(); } std::vector<Url> memberUrls() const; + Set getAlive() const; bool dumpRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ @@ -84,6 +85,10 @@ /*...@return true If this is a new member */ bool ready(const MemberId& id, const Url&); + /** + * Utility method to return intersection of two member sets + */ + static Set intersection(const Set& a, const Set& b); private: Url getUrl(const Map& map, const MemberId& id); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Jan 22 14:53:50 2009 @@ -62,14 +62,15 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false) { init(); } // Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp) + const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0) + connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0), + expectProtocolHeader(isLink) { init(); } void Connection::init() { @@ -213,7 +214,25 @@ } else { // Multicast local connections. assert(isLocal()); - cluster.getMulticast().mcastBuffer(buffer, size, self); + const char* remainingData = buffer; + size_t remainingSize = size; + if (expectProtocolHeader) { + //If this is an outgoing link, we will receive a protocol + //header which needs to be decoded first + framing::ProtocolInitiation pi; + Buffer buf(const_cast<char*>(buffer), size); + if (pi.decode(buf)) { + //TODO: check the version is correct + QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); + expectProtocolHeader = false; + remainingData = buffer + pi.encodedSize(); + remainingSize = size - pi.encodedSize(); + } else { + QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); + return 0; + } + } + cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); } return size; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Jan 22 14:53:50 2009 @@ -64,7 +64,7 @@ typedef sys::PollableQueue<EventFrame> EventFrameQueue; /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp); + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); /** Shadow connection */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); ~Connection(); @@ -172,6 +172,7 @@ framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; int readCredit; + bool expectProtocolHeader; friend std::ostream& operator<<(std::ostream&, const Connection&); }; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Jan 22 14:53:50 2009 @@ -38,21 +38,22 @@ sys::ConnectionCodec* ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { if (v == ProtocolVersion(0, 10)) - return new ConnectionCodec(out, id, cluster, false); + return new ConnectionCodec(out, id, cluster, false, false); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) - return new ConnectionCodec(out, id, cluster, true); // Catch-up connection + return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection return 0; } // Used for outgoing Link connections, we don't care. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - return next->create(out, id); + return new ConnectionCodec(out, id, cluster, false, true); + //return next->create(out, id); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp) - : codec(out, id, false), - interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp)), +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink) + : codec(out, id, isLink), + interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)), id(interceptor->getId()), localId(id) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=736841&r1=736840&r2=736841&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Thu Jan 22 14:53:50 2009 @@ -56,7 +56,7 @@ sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp); + ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink); ~ConnectionCodec(); // ConnectionCodec functions. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org