Author: aconway Date: Mon Mar 2 23:30:08 2009 New Revision: 749473 URL: http://svn.apache.org/viewvc?rev=749473&view=rev Log:
Replicate connection decoder fragments to new members. Refactoring: - Merge Decoder into ConnectionMap. - Process cluster controls in event queue thread. - Use counter not pointer for connection ID, avoid re-use. - Do all processing in event queue thread to avoid races (temporary pending performance measurements) Removed: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Modified: qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h qpid/trunk/qpid/cpp/src/qpid/cluster/types.h qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Mar 2 23:30:08 2009 @@ -53,10 +53,6 @@ qpid/cluster/ConnectionMap.cpp \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ - qpid/cluster/Decoder.cpp \ - qpid/cluster/Decoder.h \ - qpid/cluster/ConnectionDecoder.cpp \ - qpid/cluster/ConnectionDecoder.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Mar 2 23:30:08 2009 @@ -22,6 +22,7 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" @@ -91,7 +92,7 @@ cpg(*this), name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), - myId(cpg.self()), + self(cpg.self()), readMax(settings.readMax), writeEstimate(settings.writeEstimate), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), @@ -104,8 +105,7 @@ boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections), - expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), frameId(0), initialized(false), state(INIT), @@ -213,7 +213,7 @@ framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); e.setSequence(sequence++); - if (from == myId) // Record self-deliveries for flow control. + if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e); } @@ -227,42 +227,33 @@ // Handler for deliverEventQueue void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); - Buffer buf(const_cast<char*>(e.getData()), e.getSize()); - if (e.getType() == CONTROL) { - AMQFrame frame; - while (frame.decode(buf)) { - // Check for deliver close here so we can erase the - // connection decoder safely in this thread. - if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>()) - decoder.erase(e.getConnectionId()); - deliverFrameQueue.push(EventFrame(e, frame)); - } + Mutex::ScopedLock l(lock); + if (e.isCluster()) { // Cluster control, process in this thread. + AMQFrame frame(e.getFrame()); + ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); + if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); } - else if (e.getType() == DATA) - decoder.decode(e, e.getData()); + else if (state >= CATCHUP) { // Connection frame, push onto deliver queue. + if (e.getType() == CONTROL) + connectionFrame(EventFrame(e, e.getFrame())); + else + connections.decode(e, e.getData()); + } + else // connection frame && state < CATCHUP. Drop. + QPID_LOG(trace, *this << " DROP: " << e); } // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + assert(!e.isCluster()); // Only connection frames on this queue. QPID_LOG(trace, *this << " DLVR: " << e); - QPID_LATENCY_RECORD("delivered frame queue", e.frame); - if (e.isCluster()) { // Cluster control frame - ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); - if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } - else { // Connection frame. - if (state <= UPDATEE) { - QPID_LOG(trace, *this << " DROP: " << e); - return; - } - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. - connection->deliveredFrame(e); - } - QPID_LATENCY_RECORD("processed", e.frame); + if (e.type == DATA) // Sequence number to identify data frames. + const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + if (connection) // Ignore frames to closed local connections. + connection->deliveredFrame(e); } struct AddrList { @@ -310,7 +301,7 @@ std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId)); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); } void Cluster::setReady(Lock&) { @@ -323,7 +314,7 @@ bool memberChange = map.configChange(addresses); if (state == LEFT) return; - if (!map.isAlive(myId)) { // Final config change. + if (!map.isAlive(self)) { // Final config change. leave(l); return; } @@ -332,16 +323,16 @@ if (map.aliveCount() == 1) { setClusterId(true); setReady(l); - map = ClusterMap(myId, myUrl, true); + map = ClusterMap(self, myUrl, true); memberUpdate(l); QPID_LOG(notice, *this << " first in cluster"); } else { // Joining established group. state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); elders = map.getAlive(); - elders.erase(myId); + elders.erase(self); broker.getLinks().setPassive(true); } } @@ -361,7 +352,7 @@ if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self); } } @@ -388,17 +379,29 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); - if (state == CATCHUP && id == myId) { + if (state == CATCHUP && id == self) { setReady(l); QPID_LOG(notice, *this << " caught up, active cluster member"); } } +void Cluster::stall(Lock&) { + // Stop processing the deliveredEventQueue in order to send or + // recieve an update. + deliverEventQueue.stop(); +} + +void Cluster::unstall(Lock&) { + // Stop processing the deliveredEventQueue in order to send or + // recieve an update. + deliverEventQueue.start(); +} + void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == myId) { + if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. updateStart(updatee, *url, l); @@ -409,29 +412,29 @@ makeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (updatee == myId && url) { + else if (updatee == self && url) { assert(state == JOINER); setClusterId(uuid); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - deliverFrameQueue.stop(); + stall(l); checkUpdateIn(l); } } -void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { if (state == LEFT) return; assert(state == OFFER); state = UPDATER; QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - deliverFrameQueue.stop(); + stall(l); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); @@ -445,13 +448,13 @@ checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock& ) { +void Cluster::checkUpdateIn(Lock& l) { if (state == UPDATEE && updatedMap) { map = *updatedMap; - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; QPID_LOG(info, *this << " received update, starting catch-up"); - deliverFrameQueue.start(); + unstall(l); } } @@ -465,7 +468,7 @@ assert(state == UPDATER); state = READY; mcast.release(); - deliverFrameQueue.start(); + unstall(l); makeOffer(map.firstJoiner(), l); // Try another offer } @@ -490,7 +493,7 @@ { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; - stream << myId; + stream << self; if (iargs.i_brokerId == stream.str()) stopClusterNode(l); } @@ -511,7 +514,7 @@ void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), myId); + mcast.mcastControl(ClusterShutdownBody(), self); } void Cluster::memberUpdate(Lock& l) { @@ -522,12 +525,12 @@ failoverExchange->setUrls(urls); if (size == 1 && lastSize > 1 && state >= CATCHUP) { - QPID_LOG(info, *this << " last broker standing, update queue policies"); + QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); } else if (size > 1 && lastBroker) { - QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); + QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); lastBroker = false; broker.getQueues().updateQueueClusterState(false); } @@ -549,17 +552,25 @@ mgmtObject->set_memberIDs(idstr); } - // Close connections belonging to members that have now been excluded - connections.update(myId, map); + // Generate a deliver-close control frame for connections + // belonging to defunct members, so they will be erased in the + // deliverFrameQueue thread. + ConnectionMap::Vector c = connections.values(); + for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) { + ConnectionId cid = (*i)->getId(); + MemberId mid = cid.getMember(); + if (mid != self && !map.isMember(mid)) + connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody()))); + } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.myId << "(" << STATE[cluster.state] << ")"; + return o << cluster.self << "(" << STATE[cluster.state] << ")"; } MemberId Cluster::getId() const { - return myId; // Immutable, no need to lock. + return self; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { @@ -578,7 +589,7 @@ clusterId = uuid; if (mgmtObject) { stringstream stream; - stream << myId; + stream << self; mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } @@ -589,4 +600,11 @@ expiryPolicy->deliverExpire(id); } +void Cluster::connectionFrame(const EventFrame& frame) { + // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition. + // Measure performance impact, restore with better locking. + // deliverFrameQueue.push(frame); + deliveredFrame(frame); +} + }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Mar 2 23:30:08 2009 @@ -30,7 +30,6 @@ #include "NoOpConnectionOutputHandler.h" #include "PollerDispatch.h" #include "Quorum.h" -#include "Decoder.h" #include "PollableQueue.h" #include "ExpiryPolicy.h" @@ -102,7 +101,10 @@ size_t getWriteEstimate() { return writeEstimate; } bool isLeader() const; // Called in deliver thread. - + + // Called by Connection in deliver event thread with decoded connection data frames. + void connectionFrame(const EventFrame&); + private: typedef sys::Monitor::ScopedLock Lock; @@ -125,7 +127,7 @@ void brokerShutdown(); // Cluster controls implement XML methods from cluster.xml. - // Called in deliver thread. + // Called in deliveredEvent thread. // void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); @@ -134,6 +136,10 @@ void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); + // Used by cluster controls. + void stall(Lock&); + void unstall(Lock&); + // Handlers for pollable queues. void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -141,6 +147,10 @@ // Helper, called in deliver thread. void updateStart(const MemberId& updatee, const Url& url, Lock&); + // Called in event deliver thread to check for update status. + bool isUpdateComplete(const EventFrame&); + bool isUpdateComplete(); + void setReady(Lock&); void deliver( // CPG deliver callback. @@ -186,7 +196,7 @@ Cpg cpg; const std::string name; Url myUrl; - const MemberId myId; + const MemberId self; const size_t readMax; const size_t writeEstimate; framing::Uuid clusterId; @@ -201,9 +211,6 @@ boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - // Used only in deliverdEvent thread - Decoder decoder; - // Used only in deliveredFrame thread ClusterMap::Set elders; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Mar 2 23:30:08 2009 @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -58,19 +59,22 @@ NoOpConnectionOutputHandler Connection::discardHandler; -// Shadow connections -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false), +namespace { +sys::AtomicValue<uint64_t> idCounter; +} + +// Shadow connection +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) + : cluster(c), self(id), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self) { init(); } -// Local connections +// Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) - : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), + const std::string& logId, MemberId member, bool isCatchUp, bool isLink) + : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } @@ -149,12 +153,9 @@ if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - // FIXME aconway 2009-02-24: Using the DATA/CONTROL - // distinction to distinguish incoming vs. outgoing frames is - // very unclear. if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // outgoing data frame, send via SessionState + else { // frame control, send frame via SessionState broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } @@ -200,12 +201,12 @@ connection.closed(); } -// Decode data from local clients. +// ConnectoinCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) - received(localDecoder.frame); + received(localDecoder.getFrame()); } else { // Multicast local connections. assert(isLocal()); @@ -233,6 +234,29 @@ return size; } +// Decode a data event, a read buffer that has been delivered by the cluster. +void Connection::decode(const EventHeader& eh, const void* data) { + assert(eh.getType() == DATA); // Only handle connection data events. + const char* cp = static_cast<const char*>(data); + Buffer buf(const_cast<char*>(cp), eh.getSize()); + if (clusterDecoder.decode(buf)) { // Decoded a frame + AMQFrame frame(clusterDecoder.getFrame()); + while (clusterDecoder.decode(buf)) { + cluster.connectionFrame(EventFrame(eh, frame)); + frame = clusterDecoder.getFrame(); + } + // Set read-credit on the last frame ending in this event. + // Credit will be given when this frame is processed. + cluster.connectionFrame(EventFrame(eh, frame, 1)); + } + else { + // We must give 1 unit read credit per event. + // This event does not complete any frames so + // we give read credit directly. + giveReadCredit(1); + } +} + broker::SessionState& Connection::sessionState() { return *connection.getChannel(currentChannel).getSession(); } @@ -267,11 +291,12 @@ QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { - ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); - self = shadow; +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { + ConnectionId shadowId = ConnectionId(memberId, connectionId); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + self = shadowId; connection.setUserId(username); + clusterDecoder.setFragment(fragment.data(), fragment.size()); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { @@ -281,7 +306,7 @@ } bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second == this; + return self.first == cluster.getId() && self.second; } bool Connection::isShadow() const { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Mar 2 23:30:08 2009 @@ -64,10 +64,10 @@ public: typedef sys::PollableQueue<EventFrame> PollableFrameQueue; - /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); - /** Shadow connection */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); + /** Local connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); + /** Shadow connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id); ~Connection(); ConnectionId getId() const { return self; } @@ -100,9 +100,12 @@ /** Called if the connectors member has left the cluster */ void left(); - // ConnectionCodec methods + // ConnectionCodec methods - called by IO layer with a read buffer. size_t decode(const char* buffer, size_t size); + // Decode a data event, a read buffer that has been delivered by the cluster. + void decode(const EventHeader& eh, const void* data); + // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); @@ -118,7 +121,7 @@ const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); @@ -149,7 +152,9 @@ void exchange(const std::string& encoded); void giveReadCredit(int credit); - + + framing::FrameDecoder& getDecoder() { return clusterDecoder; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -174,6 +179,7 @@ WriteEstimate writeEstimate; OutputInterceptor output; framing::FrameDecoder localDecoder; + framing::FrameDecoder clusterDecoder; broker::Connection connection; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Mon Mar 2 23:30:08 2009 @@ -46,16 +46,13 @@ // Used for outgoing Link connections, we don't care. sys::ConnectionCodec* -ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - return new ConnectionCodec(out, id, cluster, false, true); - //return next->create(out, id); +ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) { + return new ConnectionCodec(out, logId, cluster, false, true); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink) - : codec(out, id, isLink), - interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)), - id(interceptor->getId()), - localId(id) +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink) + : codec(out, logId, isLink), + interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink)) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Mon Mar 2 23:30:08 2009 @@ -56,7 +56,7 @@ sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink); + ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink); ~ConnectionCodec(); // ConnectionCodec functions. @@ -71,8 +71,6 @@ private: amqp_0_10::Connection codec; boost::intrusive_ptr<cluster::Connection> interceptor; - cluster::ConnectionId id; - std::string localId; }; }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Mon Mar 2 23:30:08 2009 @@ -38,9 +38,9 @@ void ConnectionMap::erase(const ConnectionId& id) { Lock l(lock); - Map::iterator i = map.find(id); - QPID_ASSERT(i != map.end()); - map.erase(i); + size_t erased = map.erase(id); + assert(erased); + (void)erased; // Avoid unused variable warnings. } ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { @@ -61,13 +61,6 @@ return i->second; } -ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) { - Lock l(lock); - if (id.getMember() != cluster.getId()) return 0; - Map::const_iterator i = map.find(id); - return i == map.end() ? 0 : i->second; -} - ConnectionMap::Vector ConnectionMap::values() const { Lock l(lock); Vector result(map.size()); @@ -76,22 +69,16 @@ return result; } -void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) { - Lock l(lock); - for (Map::iterator i = map.begin(); i != map.end(); ) { - MemberId member = i->first.getMember(); - if (member != myId && !cluster.isMember(member)) { - i->second->left(); - map.erase(i++); - } else { - i++; - } - } -} - void ConnectionMap::clear() { Lock l(lock); map.clear(); } +void ConnectionMap::decode(const EventHeader& eh, const void* data) { + ConnectionPtr connection = get(eh.getConnectionId()); + if (connection) + connection->decode(eh, data); +} + + }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Mon Mar 2 23:30:08 2009 @@ -60,18 +60,13 @@ */ ConnectionPtr get(const ConnectionId& id); - /** If ID is a local connection and in the map return it, else return 0 */ - ConnectionPtr getLocal(const ConnectionId& id); - /** Get connections for sending an update. */ Vector values() const; - /** Remove connections who's members are no longer in the cluster. Deliver thread. */ - void update(MemberId myId, const ClusterMap& cluster); + /** Decode a connection data event. */ + void decode(const EventHeader& eh, const void* data); - void clear(); - size_t size() const; private: Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Mon Mar 2 23:30:08 2009 @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/assert.h" #include <ostream> #include <iterator> #include <algorithm> @@ -31,6 +32,7 @@ namespace cluster { using framing::Buffer; +using framing::AMQFrame; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type @@ -57,7 +59,7 @@ type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) throw Exception("Invalid multicast event type"); - connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); + connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC latency_metric_timestamp = buf.getLongLong(); @@ -93,7 +95,7 @@ void EventHeader::encode(Buffer& b) const { b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); + b.putLongLong(connectionId.getNumber()); b.putLong(size); #ifdef QPID_LATENCY_METRIC b.putLongLong(latency_metric_timestamp); @@ -111,6 +113,14 @@ return Buffer(const_cast<char*>(getData()), getSize()); } +AMQFrame Event::getFrame() const { + assert(type == CONTROL); + Buffer buf(*this); + AMQFrame frame; + QPID_ASSERT(frame.decode(buf)); + return frame; +} + static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; std::ostream& operator << (std::ostream& o, EventType t) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Mon Mar 2 23:30:08 2009 @@ -24,6 +24,7 @@ #include "types.h" #include "qpid/RefCountedBuffer.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -59,8 +60,8 @@ uint64_t getSequence() const { return sequence; } void setSequence(uint64_t n) { sequence = n; } - bool isCluster() const { return connectionId.getPointer() == 0; } - bool isConnection() const { return connectionId.getPointer() != 0; } + bool isCluster() const { return connectionId.getNumber() == 0; } + bool isConnection() const { return connectionId.getNumber() != 0; } protected: static const size_t HEADER_SIZE; @@ -97,6 +98,8 @@ // Store including header char* getStore() { return store; } const char* getStore() const { return store; } + + framing::AMQFrame getFrame() const; operator framing::Buffer() const; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Mon Mar 2 23:30:08 2009 @@ -42,8 +42,8 @@ EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0); - bool isCluster() const { return !connectionId.getPointer(); } - bool isConnection() const { return connectionId.getPointer(); } + bool isCluster() const { return connectionId.getNumber() == 0; } + bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Mar 2 23:30:08 2009 @@ -95,7 +95,7 @@ : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail) + done(ok), failed(fail), connectionSettings(cs) { connection.open(url, cs); session = connection.newSession("update_shared"); @@ -228,13 +228,15 @@ shadowConnection = catchUpConnection(); broker::Connection& bc = updateConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to use on reconnect? - shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); + connectionSettings.maxFrameSize = bc.getFrameMax(); + shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); + std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment(); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), - reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()), - updateConnection->getBrokerConnection().getUserId() + updateConnection->getId().getNumber(), + bc.getUserId(), + string(fragment.first, fragment.second) ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); @@ -285,9 +287,6 @@ if (inProgress) { inProgress->getFrames().map(simpl->out); } - - // FIXME aconway 2008-09-23: update session replay list. - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Mar 2 23:30:08 2009 @@ -98,6 +98,7 @@ client::AsyncSession session, shadowSession; boost::function<void()> done; boost::function<void(const std::exception& e)> failed; + client::ConnectionSettings connectionSettings; }; }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Mon Mar 2 23:30:08 2009 @@ -68,17 +68,16 @@ std::ostream& operator<<(std::ostream&, const MemberId&); -struct ConnectionId : public std::pair<MemberId, Connection*> { - ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {} - ConnectionId(uint64_t m, uint64_t c) - : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {} +struct ConnectionId : public std::pair<MemberId, uint64_t> { + ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {} + ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {} MemberId getMember() const { return first; } - Connection* getPointer() const { return second; } + uint64_t getNumber() const { return second; } }; std::ostream& operator<<(std::ostream&, const ConnectionId&); -std::ostream& operator << (std::ostream&, EventType); +std::ostream& operator<<(std::ostream&, EventType); }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Mon Mar 2 23:30:08 2009 @@ -21,8 +21,9 @@ #include "FrameDecoder.h" #include "Buffer.h" #include "qpid/log/Statement.h" -#include <algorithm> #include "qpid/framing/reply_exceptions.h" +#include <algorithm> +#include <string.h> namespace qpid { namespace framing { @@ -67,4 +68,13 @@ return false; } +void FrameDecoder::setFragment(const char* data, size_t size) { + fragment.resize(size); + ::memcpy(fragment.data(), data, size); +} + +std::pair<const char*, size_t> FrameDecoder::getFragment() const { + return std::pair<const char*, size_t>(fragment.data(), fragment.size()); +} + }} // namespace qpid::framing Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h Mon Mar 2 23:30:08 2009 @@ -35,9 +35,16 @@ { public: bool decode(Buffer& buffer); - AMQFrame frame; + const AMQFrame& getFrame() const { return frame; } + AMQFrame& getFrame() { return frame; } + + void setFragment(const char*, size_t); + std::pair<const char*, size_t> getFragment() const; + private: std::vector<char> fragment; + AMQFrame frame; + }; }} // namespace qpid::framing Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Mon Mar 2 23:30:08 2009 @@ -109,7 +109,7 @@ Args args(makeArgs(prefix)); vector<const char*> argv(args.size()); transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); - qpid::log::Logger::instance().setPrefix(os.str()); + qpid::log::Logger::instance().setPrefix(prefix); localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0]))); push_back(localBroker->getPort()); forkedBrokers.push_back(shared_ptr<ForkedBroker>()); Modified: qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp Mon Mar 2 23:30:08 2009 @@ -65,7 +65,7 @@ } Buffer buf(&encoded[encoded.size()-1], 1); BOOST_CHECK(decoder.decode(buf)); - BOOST_CHECK_EQUAL(data, getData(decoder.frame)); + BOOST_CHECK_EQUAL(data, getData(decoder.getFrame())); } Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=749473&r1=749472&r2=749473&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Mar 2 23:30:08 2009 @@ -125,6 +125,7 @@ <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> <field name="user-name" type="str8"/> + <field name="fragment" type="str32"/> </control> <!-- Complete a cluster state update. --> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org