Author: kgiusti
Date: Fri Feb 18 15:11:17 2011
New Revision: 1072018

URL: http://svn.apache.org/viewvc?rev=1072018&view=rev
Log:
QPID-2935: clean up race between session destructor and scheduled callback.

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1072018&r1=1072017&r2=1072018&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Feb 
18 15:11:17 2011
@@ -62,7 +62,7 @@ SessionState::SessionState(
       msgBuilder(&broker.getStore()),
       mgmtObject(0),
       rateFlowcontrol(0),
-      scheduledCmds(new std::list<SequenceNumber>)
+      scheduledCompleterContext(new ScheduledCompleterContext(this))
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -103,21 +103,24 @@ SessionState::~SessionState() {
         flowControlTimer->cancel();
 
     // clean up any outstanding incomplete commands
-    qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
-    std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > 
copy(incompleteCmds);
-    incompleteCmds.clear();
-    while (!copy.empty()) {
-        boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
-        copy.erase(copy.begin());
-        {
-            // note: need to drop lock, as callback may attempt to take it.
-            qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
-            ref->cancel();
+    {
+        qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > 
copy(incompleteCmds);
+        incompleteCmds.clear();
+        while (!copy.empty()) {
+            boost::shared_ptr<IncompleteCommandContext> 
ref(copy.begin()->second);
+            copy.erase(copy.begin());
+            {
+                // note: need to drop lock, as callback may attempt to take it.
+                qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
+                ref->cancel();
+            }
         }
     }
+
     // At this point, we are guaranteed no further completion callbacks will be
-    // made.
-    scheduledCmds->clear();  // keeps IO thread from running more completions.
+    // made.  Cancel any outstanding scheduledCompleter calls...
+    scheduledCompleterContext->cancel();
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -469,18 +472,18 @@ SessionState::createIngressMsgXferContex
  */
 void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
-    qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
     if (!sync) {
-        // note well: this path may execute in any thread.
+        /** note well: this path may execute in any thread.  It is safe to 
access
+         * the session, as the SessionState destructor will cancel all 
outstanding
+         * callbacks before getting destroyed (so we'll never get here).
+         */
         QPID_LOG(debug, ": async completion callback scheduled for msg seq=" 
<< id);
-        session->scheduledCmds->push_back(id);
-        if (session->scheduledCmds->size() == 1) {
+        if (session->scheduledCompleterContext->scheduleCompletion(id))
             
session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
-                                                                     
session->scheduledCmds,
-                                                                     session));
-        }
+                                                                     
session->scheduledCompleterContext));
     } else {  // command is being completed in IO thread.
         // this path runs only on the IO thread.
+        qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
         std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> 
>::iterator cmd;
         cmd = session->incompleteCmds.find(id);
         if (cmd != session->incompleteCmds.end()) {
@@ -502,29 +505,57 @@ void SessionState::IncompleteIngressMsgX
  * completed commands in the IO Thread.  Guaranteed not to be running at the 
same
  * time as the message receive code.
  */
-void SessionState::scheduledCompleter(boost::shared_ptr< 
std::list<SequenceNumber> > completedCmds,
-                                      SessionState *session)
+void 
SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext>
 ctxt)
 {
-    // when session is destroyed, it clears the list below. If the list is 
empty,
-    // the passed session pointer is not valid - do nothing.
-    if (completedCmds->empty()) return;
-
-    qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
-    std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can 
drop lock
-    completedCmds->clear();
-
-    while (!cmds.empty()) {
-        SequenceNumber id = cmds.front();
-        cmds.pop_front();
+    ctxt->completeCommands();
+}
+
+
+/** mark a command (sequence) as completed, return True if caller should
+ * schedule a call to completeCommands()
+ */
+bool 
SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+    completedCmds.push_back(cmd);
+    return (completedCmds.size() == 1);
+}
+
+
+/** Cause the session to complete all completed commands */
+void SessionState::ScheduledCompleterContext::completeCommands()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+    // when session is destroyed, it clears the session pointer via cancel().
+    if (!session) return;
+
+    while (!completedCmds.empty()) {
+        SequenceNumber id = completedCmds.front();
+        completedCmds.pop_front();
         std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> 
>::iterator cmd;
+        {
+            qpid::sys::ScopedLock<qpid::sys::Mutex> 
l(session->incompleteCmdsLock);
 
-        cmd = session->incompleteCmds.find(id);
-        if (cmd != session->incompleteCmds.end()) {
-            qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
-            cmd->second->do_completion();   // retakes lock
+            cmd = session->incompleteCmds.find(id);
+            if (cmd !=session->incompleteCmds.end()) {
+                boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+                {
+                    qpid::sys::ScopedUnlock<qpid::sys::Mutex> 
ul(session->incompleteCmdsLock);
+                    tmp->do_completion();   // retakes incompleteCmdslock
+                }
+            }
         }
     }
 }
 
 
+/** cancel any pending calls to scheduleComplete */
+void SessionState::ScheduledCompleterContext::cancel()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+    session = 0;
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h?rev=1072018&r1=1072017&r2=1072018&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Fri Feb 18 
15:11:17 2011
@@ -226,13 +226,29 @@ class SessionState : public qpid::Sessio
      * flow-control, etc). before the command can be completed to the client
      */
     std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > 
incompleteCmds;
-    // identifies those commands in incompleteCmds that are waiting for IO 
thread to run in order to be completed.
-    boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds;
-    qpid::sys::Mutex incompleteCmdsLock;  // locks both above containers
-
-    /** runs in IO thread, completes commands that where finished 
asynchronously. */
-    static void scheduledCompleter(boost::shared_ptr< 
std::list<SequenceNumber> > scheduledCmds,
-                                   SessionState *session);
+    qpid::sys::Mutex incompleteCmdsLock;  // locks above container
+
+    /** This context is shared between the SessionState and scheduledCompleter,
+     * holds the sequence numbers of all commands that have completed 
asynchronously.
+     */
+    class ScheduledCompleterContext {
+    private:
+        std::list<SequenceNumber> completedCmds;
+        // ordering: take this lock first, then incompleteCmdsLock
+        qpid::sys::Mutex completedCmdsLock;
+        SessionState *session;
+    public:
+        ScheduledCompleterContext(SessionState *s) : session(s) {};
+        bool scheduleCompletion(SequenceNumber cmd);
+        void completeCommands();
+        void cancel();
+    };
+    boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
+
+    /** The following method runs the in IO thread and completes commands that
+     * where finished asynchronously.
+     */
+    static void 
scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
 
     friend class SessionManager;
 };



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to