Author: cliffjansen Date: Mon Mar 19 23:24:23 2012 New Revision: 1302718 URL: http://svn.apache.org/viewvc?rev=1302718&view=rev Log: QPID-3759 hang on heartbeat connection close
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1302718&r1=1302717&r2=1302718&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Mar 19 23:24:23 2012 @@ -295,6 +295,8 @@ private: volatile bool queuedDelete; // Socket close requested, but there are operations in progress. volatile bool queuedClose; + // Most recent asynch read request + volatile AsynchReadResult* pendingRead; private: // Dispatch events that have completed. @@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s, writeInProgress(false), queuedDelete(false), queuedClose(false), + pendingRead(0), working(false) { } @@ -504,6 +507,7 @@ void AsynchIO::startReading() { } } // On status 0 or WSA_IO_PENDING, completion will handle the rest. + pendingRead = result; } else { notifyBuffersEmpty(); @@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadRe int status = result->getStatus(); size_t bytes = result->getTransferred(); if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more if (readCallback) readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); + if (queuedClose && status == ERROR_OPERATION_ABORTED) { + return; // Expected reap from CancelIoEx + } notifyEof(); if (status != 0) { @@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult { ScopedUnlock<Mutex> ul(completionLock); AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); - if (r != 0) + if (r != 0) { readComplete(r); + // Set pendingRead to 0 if it's still pointing to (newly completed) r + InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r); + } else { AsynchWriteResult *w = dynamic_cast<AsynchWriteResult*>(result); @@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult else if (queuedDelete) delete this; } + else { + if (queuedClose && pendingRead) { + // Force outstanding read to completion. Layer above will + // call back. + CancelIoEx((HANDLE)toSocketHandle(socket), + ((AsynchReadResult *)pendingRead)->overlapped()); + pendingRead = 0; + } + } } } // namespace windows --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org