QPID-7501: free sessions and links under connection lock
Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/5f5790e7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/5f5790e7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/5f5790e7 Branch: refs/heads/master Commit: 5f5790e7cd1c80a3ae28cb291679f6faaaa3ffc1 Parents: db6ba28 Author: Gordon Sim <g...@redhat.com> Authored: Tue Nov 8 10:07:09 2016 +0000 Committer: Gordon Sim <g...@redhat.com> Committed: Tue Nov 8 12:49:44 2016 +0000 ---------------------------------------------------------------------- src/qpid/messaging/amqp/ConnectionContext.cpp | 5 +++- src/qpid/messaging/amqp/ReceiverContext.cpp | 20 +++++++++++++- src/qpid/messaging/amqp/ReceiverContext.h | 3 +++ src/qpid/messaging/amqp/SenderContext.cpp | 28 +++++++++++++++++-- src/qpid/messaging/amqp/SenderContext.h | 3 +++ src/qpid/messaging/amqp/SessionContext.cpp | 31 ++++++++++++++++++++-- src/qpid/messaging/amqp/SessionContext.h | 1 + 7 files changed, 85 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ConnectionContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/ConnectionContext.cpp b/src/qpid/messaging/amqp/ConnectionContext.cpp index 5646c6c..85408de 100644 --- a/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -193,7 +193,7 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) while (!(pn_session_state(ssn->session) & PN_REMOTE_CLOSED)) { wait(); } - + ssn->cleanup(); sessions.erase(ssn->getName()); } @@ -222,6 +222,9 @@ void ConnectionContext::close() } lock.wait(); } + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + i->second->cleanup(); + } sessions.clear(); } if (state != DISCONNECTED) { http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ReceiverContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/ReceiverContext.cpp b/src/qpid/messaging/amqp/ReceiverContext.cpp index 427a633..f585f8b 100644 --- a/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -40,11 +40,12 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co ReceiverContext::~ReceiverContext() { - if (receiver) pn_link_free(receiver); + if (!error && receiver) pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) { + error.raise(); if (c != capacity) { //stop capacity = c; @@ -59,17 +60,20 @@ uint32_t ReceiverContext::getCapacity() uint32_t ReceiverContext::getAvailable() { + error.raise(); return pn_link_queued(receiver); } uint32_t ReceiverContext::getUnsettled() { + error.raise(); assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver)); return pn_link_unsettled(receiver) - pn_link_queued(receiver); } void ReceiverContext::close() { + error.raise(); if (receiver) pn_link_close(receiver); } @@ -84,6 +88,7 @@ const std::string& ReceiverContext::getSource() const } void ReceiverContext::verify() { + error.raise(); pn_terminus_t* source = pn_link_remote_source(receiver); if (!helper.isNameNull() && !pn_terminus_get_address(source)) { std::string msg("No such source : "); @@ -98,10 +103,12 @@ void ReceiverContext::verify() } void ReceiverContext::configure() { + error.raise(); if (receiver) configure(pn_link_source(receiver)); } void ReceiverContext::configure(pn_terminus_t* source) { + error.raise(); helper.configure(receiver, source, AddressHelper::FOR_RECEIVER); std::string option; if (helper.getLinkTarget(option)) { @@ -124,6 +131,7 @@ void ReceiverContext::reset(pn_session_t* session) bool ReceiverContext::hasCurrent() { + error.raise(); return receiver && pn_link_current(receiver); } @@ -137,4 +145,14 @@ bool ReceiverContext::wakeupToIssueCredit() } } +void ReceiverContext::cleanup() +{ + if (!error && receiver) { + error = new LinkError("receiver no longer valid"); + pn_link_free(receiver); + receiver = 0; + + } +} + }}} // namespace qpid::messaging::amqp http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/ReceiverContext.h ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/ReceiverContext.h b/src/qpid/messaging/amqp/ReceiverContext.h index dd1352a..93041f1 100644 --- a/src/qpid/messaging/amqp/ReceiverContext.h +++ b/src/qpid/messaging/amqp/ReceiverContext.h @@ -25,6 +25,7 @@ #include "qpid/messaging/amqp/AddressHelper.h" #include <string> #include "qpid/sys/AtomicCount.h" +#include "qpid/sys/ExceptionHolder.h" #include "qpid/sys/IntegerTypes.h" struct pn_link_t; @@ -60,6 +61,7 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); + void cleanup(); private: friend class ConnectionContext; const std::string name; @@ -71,6 +73,7 @@ class ReceiverContext qpid::sys::AtomicCount fetching; void configure(pn_terminus_t*); bool wakeupToIssueCredit(); + sys::ExceptionHolder error; }; }}} // namespace qpid::messaging::amqp http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SenderContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SenderContext.cpp b/src/qpid/messaging/amqp/SenderContext.cpp index a3ffb15..7ae5e2a 100644 --- a/src/qpid/messaging/amqp/SenderContext.cpp +++ b/src/qpid/messaging/amqp/SenderContext.cpp @@ -67,16 +67,18 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, SenderContext::~SenderContext() { - if (sender) pn_link_free(sender); + if (!error && sender) pn_link_free(sender); } void SenderContext::close() { + error.raise(); if (sender) pn_link_close(sender); } void SenderContext::setCapacity(uint32_t c) { + error.raise(); if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!"); capacity = c; } @@ -88,6 +90,7 @@ uint32_t SenderContext::getCapacity() uint32_t SenderContext::getUnsettled() { + error.raise(); return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/); } @@ -103,6 +106,7 @@ const std::string& SenderContext::getTarget() const bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out) { + error.raise(); resend();//if there are any messages needing to be resent at the front of the queue, send them first if (processUnsettled(false) < capacity && pn_link_credit(sender)) { types::Variant state; @@ -135,6 +139,7 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: void SenderContext::check() { + error.raise(); if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) { std::string text = get_error_string(pn_link_remote_condition(sender), "Link detached by peer"); pn_link_close(sender); @@ -144,6 +149,7 @@ void SenderContext::check() uint32_t SenderContext::processUnsettled(bool silent) { + error.raise(); if (!silent) { check(); } @@ -693,6 +699,7 @@ void SenderContext::Delivery::settleAndReset() } void SenderContext::verify() { + error.raise(); pn_terminus_t* target = pn_link_remote_target(sender); if (!helper.isNameNull() && !pn_terminus_get_address(target)) { std::string msg("No such target : "); @@ -709,11 +716,13 @@ void SenderContext::verify() void SenderContext::configure() { + error.raise(); if (sender) configure(pn_link_target(sender)); } void SenderContext::configure(pn_terminus_t* target) { + error.raise(); helper.configure(sender, target, AddressHelper::FOR_SENDER); std::string option; if (helper.getLinkSource(option)) { @@ -725,12 +734,17 @@ void SenderContext::configure(pn_terminus_t* target) bool SenderContext::settled() { + error.raise(); return processUnsettled(false) == 0; } bool SenderContext::closed() { - return pn_link_state(sender) & PN_LOCAL_CLOSED; + if (sender) { + return pn_link_state(sender) & PN_LOCAL_CLOSED; + } else { + return true; + } } Address SenderContext::getAddress() const @@ -754,4 +768,14 @@ void SenderContext::resend() } } +void SenderContext::cleanup() +{ + if (!error && sender) { + error = new LinkError("sender no longer valid"); + pn_link_free(sender); + sender = 0; + + } +} + }}} // namespace qpid::messaging::amqp http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SenderContext.h ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SenderContext.h b/src/qpid/messaging/amqp/SenderContext.h index 8bed808..94d987f 100644 --- a/src/qpid/messaging/amqp/SenderContext.h +++ b/src/qpid/messaging/amqp/SenderContext.h @@ -26,6 +26,7 @@ #include <vector> #include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/ExceptionHolder.h" #include "qpid/sys/Time.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/AddressHelper.h" @@ -117,6 +118,7 @@ class SenderContext virtual bool settled(); virtual bool closed(); virtual Address getAddress() const; + void cleanup(); protected: pn_link_t* sender; @@ -134,6 +136,7 @@ class SenderContext bool unreliable; const SenderOptions options; boost::shared_ptr<Transaction> transaction; + sys::ExceptionHolder error; uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SessionContext.cpp ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SessionContext.cpp b/src/qpid/messaging/amqp/SessionContext.cpp index 3420a64..b57ae27 100644 --- a/src/qpid/messaging/amqp/SessionContext.cpp +++ b/src/qpid/messaging/amqp/SessionContext.cpp @@ -97,13 +97,21 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string void SessionContext::removeReceiver(const std::string& n) { error.raise(); - receivers.erase(n); + SessionContext::ReceiverMap::iterator i = receivers.find(n); + if (i != receivers.end()) { + i->second->cleanup(); + receivers.erase(i); + } } void SessionContext::removeSender(const std::string& n) { error.raise(); - senders.erase(n); + SessionContext::SenderMap::iterator i = senders.find(n); + if (i != senders.end()) { + i->second->cleanup(); + senders.erase(i); + } } boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() @@ -258,5 +266,24 @@ void SessionContext::resetSession(pn_session_t* session_) { } } +void SessionContext::cleanup() { + if (transaction) { + transaction->cleanup(); + transaction = boost::shared_ptr<Transaction>(); + } + for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + i->second->cleanup(); + } + senders.clear(); + for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { + i->second->cleanup(); + } + receivers.clear(); + if (!error && session) { + error = new SessionClosed(); + pn_session_free(session); + session = 0; + } +} }}} // namespace qpid::messaging::amqp http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/5f5790e7/src/qpid/messaging/amqp/SessionContext.h ---------------------------------------------------------------------- diff --git a/src/qpid/messaging/amqp/SessionContext.h b/src/qpid/messaging/amqp/SessionContext.h index 8c28208..dbefd61 100644 --- a/src/qpid/messaging/amqp/SessionContext.h +++ b/src/qpid/messaging/amqp/SessionContext.h @@ -90,6 +90,7 @@ class SessionContext void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); void resetSession(pn_session_t*); + void cleanup(); }; }}} // namespace qpid::messaging::amqp --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org