Author: kgiusti Date: Fri Feb 18 15:11:17 2011 New Revision: 1072018 URL: http://svn.apache.org/viewvc?rev=1072018&view=rev Log: QPID-2935: clean up race between session destructor and scheduled callback.
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1072018&r1=1072017&r2=1072018&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Feb 18 15:11:17 2011 @@ -62,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - scheduledCmds(new std::list<SequenceNumber>) + scheduledCompleterContext(new ScheduledCompleterContext(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -103,21 +103,24 @@ SessionState::~SessionState() { flowControlTimer->cancel(); // clean up any outstanding incomplete commands - qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); - incompleteCmds.clear(); - while (!copy.empty()) { - boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); - copy.erase(copy.begin()); - { - // note: need to drop lock, as callback may attempt to take it. - qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); - ref->cancel(); + { + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); + incompleteCmds.clear(); + while (!copy.empty()) { + boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); + copy.erase(copy.begin()); + { + // note: need to drop lock, as callback may attempt to take it. + qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); + ref->cancel(); + } } } + // At this point, we are guaranteed no further completion callbacks will be - // made. - scheduledCmds->clear(); // keeps IO thread from running more completions. + // made. Cancel any outstanding scheduledCompleter calls... + scheduledCompleterContext->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -469,18 +472,18 @@ SessionState::createIngressMsgXferContex */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); if (!sync) { - // note well: this path may execute in any thread. + /** note well: this path may execute in any thread. It is safe to access + * the session, as the SessionState destructor will cancel all outstanding + * callbacks before getting destroyed (so we'll never get here). + */ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - session->scheduledCmds->push_back(id); - if (session->scheduledCmds->size() == 1) { + if (session->scheduledCompleterContext->scheduleCompletion(id)) session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, - session->scheduledCmds, - session)); - } + session->scheduledCompleterContext)); } else { // command is being completed in IO thread. // this path runs only on the IO thread. + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; cmd = session->incompleteCmds.find(id); if (cmd != session->incompleteCmds.end()) { @@ -502,29 +505,57 @@ void SessionState::IncompleteIngressMsgX * completed commands in the IO Thread. Guaranteed not to be running at the same * time as the message receive code. */ -void SessionState::scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > completedCmds, - SessionState *session) +void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) { - // when session is destroyed, it clears the list below. If the list is empty, - // the passed session pointer is not valid - do nothing. - if (completedCmds->empty()) return; - - qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); - std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can drop lock - completedCmds->clear(); - - while (!cmds.empty()) { - SequenceNumber id = cmds.front(); - cmds.pop_front(); + ctxt->completeCommands(); +} + + +/** mark a command (sequence) as completed, return True if caller should + * schedule a call to completeCommands() + */ +bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + completedCmds.push_back(cmd); + return (completedCmds.size() == 1); +} + + +/** Cause the session to complete all completed commands */ +void SessionState::ScheduledCompleterContext::completeCommands() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + // when session is destroyed, it clears the session pointer via cancel(). + if (!session) return; + + while (!completedCmds.empty()) { + SequenceNumber id = completedCmds.front(); + completedCmds.pop_front(); std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); - cmd = session->incompleteCmds.find(id); - if (cmd != session->incompleteCmds.end()) { - qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); - cmd->second->do_completion(); // retakes lock + cmd = session->incompleteCmds.find(id); + if (cmd !=session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + { + qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); + tmp->do_completion(); // retakes incompleteCmdslock + } + } } } } +/** cancel any pending calls to scheduleComplete */ +void SessionState::ScheduledCompleterContext::cancel() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + session = 0; +} + }} // namespace qpid::broker Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h?rev=1072018&r1=1072017&r2=1072018&view=diff ============================================================================== --- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h (original) +++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Fri Feb 18 15:11:17 2011 @@ -226,13 +226,29 @@ class SessionState : public qpid::Sessio * flow-control, etc). before the command can be completed to the client */ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds; - // identifies those commands in incompleteCmds that are waiting for IO thread to run in order to be completed. - boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds; - qpid::sys::Mutex incompleteCmdsLock; // locks both above containers - - /** runs in IO thread, completes commands that where finished asynchronously. */ - static void scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds, - SessionState *session); + qpid::sys::Mutex incompleteCmdsLock; // locks above container + + /** This context is shared between the SessionState and scheduledCompleter, + * holds the sequence numbers of all commands that have completed asynchronously. + */ + class ScheduledCompleterContext { + private: + std::list<SequenceNumber> completedCmds; + // ordering: take this lock first, then incompleteCmdsLock + qpid::sys::Mutex completedCmdsLock; + SessionState *session; + public: + ScheduledCompleterContext(SessionState *s) : session(s) {}; + bool scheduleCompletion(SequenceNumber cmd); + void completeCommands(); + void cancel(); + }; + boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext; + + /** The following method runs the in IO thread and completes commands that + * where finished asynchronously. + */ + static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>); friend class SessionManager; }; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org