Author: astitcher Date: Fri Nov 11 16:06:08 2011 New Revision: 1200925 URL: http://svn.apache.org/viewvc?rev=1200925&view=rev Log: QPID-3608: Improve C++ broker consume performance
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1200925&r1=1200924&r2=1200925&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Nov 11 16:06:08 2011 @@ -61,11 +61,10 @@ struct StaticInit { * case we could rebalance the info occasionally. */ __thread int threadReadTotal = 0; -__thread int threadMaxRead = 0; __thread int threadReadCount = 0; __thread int threadWriteTotal = 0; __thread int threadWriteCount = 0; -__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms +__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms } /* @@ -426,7 +425,6 @@ void AsynchIO::readable(DispatchHandle& // We have been flow controlled. return; } - int readTotal = 0; AbsTime readStartTime = AbsTime::now(); do { // (Try to) get a buffer @@ -441,7 +439,6 @@ void AsynchIO::readable(DispatchHandle& if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; - readTotal += rc; readCallback(*this, buff); if (readingStopped) { @@ -453,17 +450,17 @@ void AsynchIO::readable(DispatchHandle& // If we didn't fill the read buffer then time to stop reading break; } - + // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { + if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; } - + } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); assert(buff); - + // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { eofCallback(*this); @@ -491,12 +488,11 @@ void AsynchIO::readable(DispatchHandle& h.unwatchRead(); break; } - + } } while (true); ++threadReadCount; - threadMaxRead = std::max(threadMaxRead, readTotal); return; } @@ -504,7 +500,7 @@ void AsynchIO::readable(DispatchHandle& * We carry on writing whilst we have data to write and we can write */ void AsynchIO::writeable(DispatchHandle& h) { - int writeTotal = 0; + AbsTime writeStartTime = AbsTime::now(); do { // See if we've got something to write if (!writeQueue.empty()) { @@ -516,7 +512,6 @@ void AsynchIO::writeable(DispatchHandle& int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; - writeTotal += rc; // If we didn't write full buffer put rest back if (rc != buff->dataCount) { @@ -525,14 +520,14 @@ void AsynchIO::writeable(DispatchHandle& writeQueue.push_back(buff); break; } - + // Recycle the buffer queueReadBuffer(buff); - - // If we've already written more than the max for reading then stop - // (this is to stop writes dominating reads) - if (writeTotal > threadMaxRead) + + // Stop writing if we've overrun our timeslot + if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; + } } else { // Put buffer back writeQueue.push_back(buff); @@ -580,7 +575,7 @@ void AsynchIO::writeable(DispatchHandle& ++threadWriteCount; return; } - + void AsynchIO::disconnected(DispatchHandle& h) { // If we have not already queued close then call disconnected callback before closing if (!queuedClose && disCallback) disCallback(*this); Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1200925&r1=1200924&r2=1200925&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Nov 11 16:06:08 2011 @@ -57,11 +57,10 @@ void ignoreSigpipe() { * case we could rebalance the info occasionally. */ __thread int threadReadTotal = 0; -__thread int threadMaxRead = 0; __thread int threadReadCount = 0; __thread int threadWriteTotal = 0; __thread int threadWriteCount = 0; -__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms +__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms } /* @@ -277,7 +276,6 @@ SslIO::BufferBase* SslIO::getQueuedBuffe * it in */ void SslIO::readable(DispatchHandle& h) { - int readTotal = 0; AbsTime readStartTime = AbsTime::now(); do { // (Try to) get a buffer @@ -292,24 +290,23 @@ void SslIO::readable(DispatchHandle& h) if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; - readTotal += rc; readCallback(*this, buff); if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading break; } - + // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxReadTimeNs) { + if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; } - + } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); assert(buff); - + // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { eofCallback(*this); @@ -337,12 +334,11 @@ void SslIO::readable(DispatchHandle& h) h.unwatchRead(); break; } - + } } while (true); ++threadReadCount; - threadMaxRead = std::max(threadMaxRead, readTotal); return; } @@ -350,7 +346,7 @@ void SslIO::readable(DispatchHandle& h) * We carry on writing whilst we have data to write and we can write */ void SslIO::writeable(DispatchHandle& h) { - int writeTotal = 0; + AbsTime writeStartTime = AbsTime::now(); do { // See if we've got something to write if (!writeQueue.empty()) { @@ -362,7 +358,6 @@ void SslIO::writeable(DispatchHandle& h) int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; - writeTotal += rc; // If we didn't write full buffer put rest back if (rc != buff->dataCount) { @@ -371,14 +366,14 @@ void SslIO::writeable(DispatchHandle& h) writeQueue.push_back(buff); break; } - + // Recycle the buffer queueReadBuffer(buff); - - // If we've already written more than the max for reading then stop - // (this is to stop writes dominating reads) - if (writeTotal > threadMaxRead) + + // Stop writing if we've overrun our timeslot + if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { break; + } } else { // Put buffer back writeQueue.push_back(buff); @@ -425,7 +420,7 @@ void SslIO::writeable(DispatchHandle& h) ++threadWriteCount; return; } - + void SslIO::disconnected(DispatchHandle& h) { // If we've already queued close do it instead of disconnected callback if (queuedClose) { --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org