Author: astitcher
Date: Fri Nov 11 16:06:08 2011
New Revision: 1200925

URL: http://svn.apache.org/viewvc?rev=1200925&view=rev
Log:
QPID-3608: Improve C++ broker consume performance

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1200925&r1=1200924&r2=1200925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Nov 11 16:06:08 2011
@@ -61,11 +61,10 @@ struct StaticInit {
  * case we could rebalance the info occasionally.  
  */
 __thread int threadReadTotal = 0;
-__thread int threadMaxRead = 0;
 __thread int threadReadCount = 0;
 __thread int threadWriteTotal = 0;
 __thread int threadWriteCount = 0;
-__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
 }
 
 /*
@@ -426,7 +425,6 @@ void AsynchIO::readable(DispatchHandle& 
         // We have been flow controlled.
         return;
     }
-    int readTotal = 0;
     AbsTime readStartTime = AbsTime::now();
     do {
         // (Try to) get a buffer
@@ -441,7 +439,6 @@ void AsynchIO::readable(DispatchHandle& 
             if (rc > 0) {
                 buff->dataCount += rc;
                 threadReadTotal += rc;
-                readTotal += rc;
 
                 readCallback(*this, buff);
                 if (readingStopped) {
@@ -453,17 +450,17 @@ void AsynchIO::readable(DispatchHandle& 
                     // If we didn't fill the read buffer then time to stop 
reading
                     break;
                 }
-                
+
                 // Stop reading if we've overrun our timeslot
-                if (Duration(readStartTime, AbsTime::now()) > 
threadMaxReadTimeNs) {
+                if (Duration(readStartTime, AbsTime::now()) > 
threadMaxIoTimeNs) {
                     break;
                 }
-                
+
             } else {
                 // Put buffer back (at front so it doesn't interfere with 
unread buffers)
                 bufferQueue.push_front(buff);
                 assert(buff);
-                
+
                 // Eof or other side has gone away
                 if (rc == 0 || errno == ECONNRESET) {
                     eofCallback(*this);
@@ -491,12 +488,11 @@ void AsynchIO::readable(DispatchHandle& 
                 h.unwatchRead();
                 break;
             }
-            
+
         }
     } while (true);
 
     ++threadReadCount;
-    threadMaxRead = std::max(threadMaxRead, readTotal);
     return;
 }
 
@@ -504,7 +500,7 @@ void AsynchIO::readable(DispatchHandle& 
  * We carry on writing whilst we have data to write and we can write
  */
 void AsynchIO::writeable(DispatchHandle& h) {
-    int writeTotal = 0;
+    AbsTime writeStartTime = AbsTime::now();
     do {
         // See if we've got something to write
         if (!writeQueue.empty()) {
@@ -516,7 +512,6 @@ void AsynchIO::writeable(DispatchHandle&
             int rc = socket.write(buff->bytes+buff->dataStart, 
buff->dataCount);
             if (rc >= 0) {
                 threadWriteTotal += rc;
-                writeTotal += rc;
 
                 // If we didn't write full buffer put rest back
                 if (rc != buff->dataCount) {
@@ -525,14 +520,14 @@ void AsynchIO::writeable(DispatchHandle&
                     writeQueue.push_back(buff);
                     break;
                 }
-                
+
                 // Recycle the buffer
                 queueReadBuffer(buff);
-                
-                // If we've already written more than the max for reading then 
stop
-                // (this is to stop writes dominating reads) 
-                if (writeTotal > threadMaxRead)
+
+                // Stop writing if we've overrun our timeslot
+                if (Duration(writeStartTime, AbsTime::now()) > 
threadMaxIoTimeNs) {
                     break;
+                }
             } else {
                 // Put buffer back
                 writeQueue.push_back(buff);
@@ -580,7 +575,7 @@ void AsynchIO::writeable(DispatchHandle&
     ++threadWriteCount;
     return;
 }
-        
+
 void AsynchIO::disconnected(DispatchHandle& h) {
     // If we have not already queued close then call disconnected callback 
before closing
     if (!queuedClose && disCallback) disCallback(*this);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1200925&r1=1200924&r2=1200925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Nov 11 16:06:08 2011
@@ -57,11 +57,10 @@ void ignoreSigpipe() {
  * case we could rebalance the info occasionally.  
  */
 __thread int threadReadTotal = 0;
-__thread int threadMaxRead = 0;
 __thread int threadReadCount = 0;
 __thread int threadWriteTotal = 0;
 __thread int threadWriteCount = 0;
-__thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
+__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
 }
 
 /*
@@ -277,7 +276,6 @@ SslIO::BufferBase* SslIO::getQueuedBuffe
  * it in
  */
 void SslIO::readable(DispatchHandle& h) {
-    int readTotal = 0;
     AbsTime readStartTime = AbsTime::now();
     do {
         // (Try to) get a buffer
@@ -292,24 +290,23 @@ void SslIO::readable(DispatchHandle& h) 
             if (rc > 0) {
                 buff->dataCount += rc;
                 threadReadTotal += rc;
-                readTotal += rc;
 
                 readCallback(*this, buff);
                 if (rc != readCount) {
                     // If we didn't fill the read buffer then time to stop 
reading
                     break;
                 }
-                
+
                 // Stop reading if we've overrun our timeslot
-                if (Duration(readStartTime, AbsTime::now()) > 
threadMaxReadTimeNs) {
+                if (Duration(readStartTime, AbsTime::now()) > 
threadMaxIoTimeNs) {
                     break;
                 }
-                
+
             } else {
                 // Put buffer back (at front so it doesn't interfere with 
unread buffers)
                 bufferQueue.push_front(buff);
                 assert(buff);
-                
+
                 // Eof or other side has gone away
                 if (rc == 0 || errno == ECONNRESET) {
                     eofCallback(*this);
@@ -337,12 +334,11 @@ void SslIO::readable(DispatchHandle& h) 
                 h.unwatchRead();
                 break;
             }
-            
+
         }
     } while (true);
 
     ++threadReadCount;
-    threadMaxRead = std::max(threadMaxRead, readTotal);
     return;
 }
 
@@ -350,7 +346,7 @@ void SslIO::readable(DispatchHandle& h) 
  * We carry on writing whilst we have data to write and we can write
  */
 void SslIO::writeable(DispatchHandle& h) {
-    int writeTotal = 0;
+    AbsTime writeStartTime = AbsTime::now();
     do {
         // See if we've got something to write
         if (!writeQueue.empty()) {
@@ -362,7 +358,6 @@ void SslIO::writeable(DispatchHandle& h)
             int rc = socket.write(buff->bytes+buff->dataStart, 
buff->dataCount);
             if (rc >= 0) {
                 threadWriteTotal += rc;
-                writeTotal += rc;
 
                 // If we didn't write full buffer put rest back
                 if (rc != buff->dataCount) {
@@ -371,14 +366,14 @@ void SslIO::writeable(DispatchHandle& h)
                     writeQueue.push_back(buff);
                     break;
                 }
-                
+
                 // Recycle the buffer
                 queueReadBuffer(buff);
-                
-                // If we've already written more than the max for reading then 
stop
-                // (this is to stop writes dominating reads) 
-                if (writeTotal > threadMaxRead)
+
+                // Stop writing if we've overrun our timeslot
+                if (Duration(writeStartTime, AbsTime::now()) > 
threadMaxIoTimeNs) {
                     break;
+                }
             } else {
                 // Put buffer back
                 writeQueue.push_back(buff);
@@ -425,7 +420,7 @@ void SslIO::writeable(DispatchHandle& h)
     ++threadWriteCount;
     return;
 }
-        
+
 void SslIO::disconnected(DispatchHandle& h) {
     // If we've already queued close do it instead of disconnected callback
     if (queuedClose) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to