Author: ritchiem Date: Wed Oct 28 15:38:33 2009 New Revision: 830622 URL: http://svn.apache.org/viewvc?rev=830622&view=rev Log: Fix problems with sessions going out of scope and session numbers wrapping around.
Fixes QPID-1789: sessions that go out of scope without being detached will detach themselves. Also fixes several issues that arise when the session numbers wraps around and start re-using old numbers. Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/ConnectionImpl.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=830622&r1=830621&r2=830622&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Oct 28 15:38:33 2009 @@ -35,14 +35,14 @@ using namespace std; void SessionHandler::checkAttached() { - if (!getState()) { - ignoring = true; + if (!getState()) throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); - } } SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) - : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {} + : channel(ch, out), peer(channel), + awaitingDetached(false), + sendReady(), receiveReady() {} SessionHandler::~SessionHandler() {} @@ -50,7 +50,7 @@ bool isSessionControl(AMQMethodBody* m) { return m && m->amqpClassId() == SESSION_CLASS_ID; -} + } bool isSessionDetachedControl(AMQMethodBody* m) { return isSessionControl(m) && m->amqpMethodId() == SESSION_DETACHED_METHOD_ID; @@ -76,12 +76,13 @@ // Note on channel states: a channel is attached if session != 0 AMQMethodBody* m = f.getBody()->getMethod(); try { - if (ignoring && !isSessionDetachedControl(m)) - return; - else if (isSessionControl(m)) + if (isSessionControl(m)) { invoke(*m); + } else { - checkAttached(); + // Drop frames if we are awaiting a detached control or + // if we are currently detached. + if (awaitingDetached || !getState()) return; if (!receiveReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); if (!getState()->receiverRecord(f)) @@ -142,11 +143,11 @@ // Save the name for possible session-busy exception. Session-busy // can be thrown before we have attached the handler to a valid // SessionState, and in that case we need the name to send peer.detached - name = name_; + name = name_; if (getState() && name == getState()->getId().getName()) return; // Idempotent if (getState()) - throw TransportBusyException( + throw TransportBusyException( QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId())); setState(name, force); QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); @@ -157,8 +158,8 @@ sendCommandPoint(getState()->senderGetCommandPoint()); } -#define CHECK_NAME(NAME, MSG) do { \ - checkAttached(); \ +#define CHECK_NAME(NAME, MSG) do { \ + checkAttached(); \ if (NAME != getState()->getId().getName()) \ throw InvalidArgumentException( \ QPID_MSG(MSG << ": incorrect session name: " << NAME \ @@ -178,7 +179,7 @@ void SessionHandler::detached(const std::string& name, uint8_t code) { CHECK_NAME(name, "session.detached"); - ignoring = false; + awaitingDetached = false; if (code != session::DETACH_CODE_NORMAL) channelException(convert(code), "session.detached from peer."); else { @@ -273,7 +274,7 @@ void SessionHandler::sendDetach() { checkAttached(); - ignoring = true; + awaitingDetached = true; peer.detach(getState()->getId().getName()); } Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=830622&r1=830621&r2=830622&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Wed Oct 28 15:38:33 2009 @@ -101,14 +101,15 @@ QPID_COMMON_EXTERN virtual void handleOut(framing::AMQFrame&); framing::ChannelHandler channel; - framing::AMQP_AllProxy::Session peer; - bool ignoring; - bool sendReady, receiveReady; - std::string name; private: void checkAttached(); void sendCommandPoint(const SessionPoint&); + + framing::AMQP_AllProxy::Session peer; + std::string name; + bool awaitingDetached; + bool sendReady, receiveReady; }; }} // namespace qpid::amqp_0_10 Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/ConnectionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=830622&r1=830621&r2=830622&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Oct 28 15:38:33 2009 @@ -95,11 +95,21 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) { Mutex::ScopedLock l(lock); - session->setChannel(channel == NEXT_CHANNEL ? nextChannel++ : channel); - boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; - boost::shared_ptr<SessionImpl> ss = s.lock(); - if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId())); - s = session; + for (uint16_t i = 0; i < NEXT_CHANNEL; i++) { //will at most search through channels once + uint16_t c = channel == NEXT_CHANNEL ? nextChannel++ : channel; + boost::weak_ptr<SessionImpl>& s = sessions[c]; + boost::shared_ptr<SessionImpl> ss = s.lock(); + if (!ss) { + //channel is free, we can assign it to this session + session->setChannel(c); + s = session; + return; + } else if (channel != NEXT_CHANNEL) { + //channel is taken and was requested explicitly so don't look for another + throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId())); + } //else channel is busy, but we can keep looking for a free one + } + } void ConnectionImpl::handle(framing::AMQFrame& frame) @@ -165,7 +175,6 @@ } else { QPID_LOG(debug, "No security layer in place"); } - failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls)); } @@ -246,7 +255,7 @@ { return handler; } - + std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=830622&r1=830621&r2=830622&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Oct 28 15:38:33 2009 @@ -65,7 +65,8 @@ nextIn(0), nextOut(0), sendMsgCredit(0), - doClearDeliveryPropertiesExchange(true) + doClearDeliveryPropertiesExchange(true), + autoDetach(true) { channel.next = connectionShared.get(); } @@ -73,8 +74,11 @@ SessionImpl::~SessionImpl() { { Lock l(state); - if (state != DETACHED) { - QPID_LOG(warning, "Session was not closed cleanly"); + if (state != DETACHED && state != DETACHING) { + QPID_LOG(warning, "Session was not closed cleanly: " << id); + // Inform broker but don't wait for detached as that deadlocks. + // The detached will be ignored as the channel will be invalid. + if (autoDetach) detach(); setState(DETACHED); handleClosed(); state.waitWaiters(); @@ -816,4 +820,6 @@ return connectionWeak.lock(); } +void SessionImpl::disableAutoDetach() { autoDetach = false; } + }} Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.h?rev=830622&r1=830621&r2=830622&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/client/SessionImpl.h Wed Oct 28 15:38:33 2009 @@ -132,6 +132,9 @@ void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } + /** Suppress sending detach in destructor. Used by cluster to build session state */ + void disableAutoDetach(); + private: enum State { INACTIVE, @@ -247,6 +250,8 @@ bool doClearDeliveryPropertiesExchange; + bool autoDetach; + friend class client::SessionHandler; }; Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=830622&r1=830621&r2=830622&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Oct 28 15:38:33 2009 @@ -313,6 +313,7 @@ // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); + simpl->disableAutoDetach(); client::SessionBase_0_10Access(shadowSession).set(simpl); AMQP_AllProxy::ClusterConnection proxy(simpl->out); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org