Author: cliffjansen
Date: Mon Mar 19 23:24:23 2012
New Revision: 1302718

URL: http://svn.apache.org/viewvc?rev=1302718&view=rev
Log:
QPID-3759 hang on heartbeat connection close

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1302718&r1=1302717&r2=1302718&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Mar 19 23:24:23 
2012
@@ -295,6 +295,8 @@ private:
     volatile bool queuedDelete;
     // Socket close requested, but there are operations in progress.
     volatile bool queuedClose;
+    // Most recent asynch read request
+    volatile AsynchReadResult* pendingRead;
 
 private:
     // Dispatch events that have completed.
@@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s,
     writeInProgress(false),
     queuedDelete(false),
     queuedClose(false),
+    pendingRead(0),
     working(false) {
 }
 
@@ -504,6 +507,7 @@ void AsynchIO::startReading() {
             }
         }
         // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+        pendingRead = result;
     }
     else {
         notifyBuffersEmpty();
@@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadRe
     int status = result->getStatus();
     size_t bytes = result->getTransferred();
     if (status == 0 && bytes > 0) {
-        bool restartRead = true;     // May not if receiver doesn't want more
         if (readCallback)
             readCallback(*this, result->getBuff());
-        if (restartRead)
-            startReading();
+        startReading();
     }
     else {
         // No data read, so put the buffer back. It may be partially filled,
         // so "unread" it back to the front of the queue.
         unread(result->getBuff());
+        if (queuedClose && status == ERROR_OPERATION_ABORTED) {
+            return; // Expected reap from CancelIoEx
+        }
         notifyEof();
         if (status != 0)
         {
@@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult
             {
                 ScopedUnlock<Mutex> ul(completionLock);
                 AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
-                if (r != 0)
+                if (r != 0) {
                     readComplete(r);
+                    // Set pendingRead to 0 if it's still pointing to (newly 
completed) r
+                    InterlockedCompareExchangePointer((void * volatile 
*)&pendingRead, 0, r);
+                }
                 else {
                     AsynchWriteResult *w =
                         dynamic_cast<AsynchWriteResult*>(result);
@@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult
         else if (queuedDelete)
             delete this;
     }
+    else {
+        if (queuedClose && pendingRead) {
+            // Force outstanding read to completion.  Layer above will
+            // call back.
+            CancelIoEx((HANDLE)toSocketHandle(socket),
+                       ((AsynchReadResult *)pendingRead)->overlapped());
+            pendingRead = 0;
+        }
+    }
 }
 
 } // namespace windows



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to