Author: astitcher Date: Tue May 18 21:33:15 2010 New Revision: 945899 URL: http://svn.apache.org/viewvc?rev=945899&view=rev Log: Fix the behaviour of the EpollPoller when shutdowns and interrupts interact
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=945899&r1=945898&r2=945899&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue May 18 21:33:15 2010 @@ -536,6 +536,12 @@ Poller::Event Poller::wait(Duration time // Check if this is an interrupt PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle; if (dataPtr == &interruptHandle) { + // If we are shutting down we need to rearm the shutdown interrupt to + // ensure everyone still sees it. It's okay that this might be overridden + // below as we will be back here if it is. + if (impl->isShutdown) { + impl->interruptAll(); + } PollerHandle* wrappedHandle = 0; { ScopedLock<Mutex> l(interruptHandle.impl->lock); Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=945899&r1=945898&r2=945899&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Tue May 18 21:33:15 2010 @@ -69,20 +69,24 @@ int readALot(int fd) { return bytesRead; } +void makesocketpair(int (&sv)[2]) { + int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); + assert(rc >= 0); + + // Set non-blocking + rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); + assert(rc >= 0); + + rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); + assert(rc >= 0); +} + int main(int /*argc*/, char** /*argv*/) { try { int sv[2]; - int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); - assert(rc >= 0); - - // Set non-blocking - rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); - assert(rc >= 0); - - rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); - assert(rc >= 0); + makesocketpair(sv); // Make up a large string string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;"; @@ -92,16 +96,13 @@ int main(int /*argc*/, char** /*argv*/) // Read as much as we can from socket 0 int bytesRead = readALot(sv[0]); assert(bytesRead == 0); - cout << "Read(0): " << bytesRead << " bytes\n"; // Write as much as we can to socket 0 int bytesWritten = writeALot(sv[0], testString); - cout << "Wrote(0): " << bytesWritten << " bytes\n"; // Read as much as we can from socket 1 bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); - cout << "Read(1): " << bytesRead << " bytes\n"; auto_ptr<Poller> poller(new Poller); @@ -121,7 +122,6 @@ int main(int /*argc*/, char** /*argv*/) // Write as much as we can to socket 0 bytesWritten = writeALot(sv[0], testString); - cout << "Wrote(0): " << bytesWritten << " bytes\n"; // Wait for 500ms - h0 no longer writable event = poller->wait(500000000); @@ -136,7 +136,6 @@ int main(int /*argc*/, char** /*argv*/) bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); - cout << "Read(1): " << bytesRead << " bytes\n"; // Test poller interrupt assert(poller->interrupt(h0) == true); @@ -218,6 +217,43 @@ int main(int /*argc*/, char** /*argv*/) assert(event.handle == 0); assert(event.type == Poller::SHUTDOWN); + ::close(sv[0]); + + // Test for correct interaction of shutdown and interrupts - need to have new poller + // etc. for this + makesocketpair(sv); + + auto_ptr<Poller> poller1(new Poller); + + PosixIOHandle f2(sv[0]); + PosixIOHandle f3(sv[1]); + + PollerHandle h2(f2); + PollerHandle h3(f3); + + poller1->registerHandle(h2); + poller1->monitorHandle(h2, Poller::INOUT); + event = poller1->wait(); + assert(event.handle == &h2); + assert(event.type == Poller::WRITABLE); + + // Shutdown + poller1->shutdown(); + event = poller1->wait(); + assert(event.handle == 0); + assert(event.type == Poller::SHUTDOWN); + + assert(poller1->interrupt(h2) == true); + event = poller1->wait(); + assert(event.handle == &h2); + assert(event.type == Poller::INTERRUPTED); + poller1->unmonitorHandle(h2, Poller::INOUT); + + event = poller1->wait(); + assert(event.handle == 0); + assert(event.type == Poller::SHUTDOWN); + + poller1->unregisterHandle(h2); return 0; } catch (exception& e) { cout << "Caught exception " << e.what() << "\n"; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org