Prepare the way to efficiently parse client requests using SBuf based
parser-ng.

We rely on several guarantees for this to work:

 * rawSpace(N) guarantees that the backing store is both unique to this
SBuf and has at least N bytes of unused buffer allocated.

 * SBuf member retains an active reference to the MemBlob backing store
for the duration of the read operation. Except in cases whete
ConnStateData gets destructed, in which case ConnStateData cancels the
read explicitly.

 * comm_read() is effectively a blocking operation with regards to read
on the client socket. No other read is scheduled until the callback
handler has been run.

 * parsing and other manipulations of the buffer are done synchronously
by the read handler before scheduling more reads.


FWIW: this code has had one days testing under real-world traffic from a
few heavy users with good results.

Amos
=== modified file 'src/client_side.cc'
--- src/client_side.cc  2014-02-21 10:46:19 +0000
+++ src/client_side.cc  2014-03-08 00:29:42 +0000
@@ -248,12 +248,12 @@
 
     debugs(33, 4, HERE << clientConnection << ": reading request...");
 
-    if (!maybeMakeSpaceAvailable())
+    if (!in.maybeMakeSpaceAvailable())
         return;
 
     typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
     reader = JobCallback(33, 5, Dialer, this, 
ConnStateData::clientReadRequest);
-    comm_read(clientConnection, in.addressToReadInto(), 
getAvailableBufferLength(), reader);
+    comm_read(clientConnection, in.buf.rawSpace(in.buf.spaceSize()), 
in.buf.spaceSize()-1, reader);
 }
 
 void
@@ -1884,7 +1884,7 @@
     if (!stoppedReceiving()) {
         if (const int64_t expecting = mayNeedToReadMoreBody()) {
             debugs(33, 5, HERE << "must still read " << expecting <<
-                   " request body bytes with " << in.notYetUsed << " unused");
+                   " request body bytes with " << in.buf.length() << " 
unused");
             return; // wait for the request receiver to finish reading
         }
     }
@@ -1951,7 +1951,7 @@
     ClientSocketContext *context;
     StoreIOBuffer tempBuffer;
     http = new ClientHttpRequest(csd);
-    http->req_sz = csd->in.notYetUsed;
+    http->req_sz = csd->in.buf.length();
     http->uri = xstrdup(uri);
     setLogUri (http, uri);
     context = new ClientSocketContext(csd->clientConnection, http);
@@ -2374,32 +2374,20 @@
     return result;
 }
 
-int
-ConnStateData::getAvailableBufferLength() const
-{
-    assert (in.allocatedSize > in.notYetUsed); // allocated more than used
-    const size_t result = in.allocatedSize - in.notYetUsed - 1;
-    // huge request_header_max_size may lead to more than INT_MAX unused space
-    assert (static_cast<ssize_t>(result) <= INT_MAX);
-    return result;
-}
-
 bool
-ConnStateData::maybeMakeSpaceAvailable()
+ConnStateData::In::maybeMakeSpaceAvailable()
 {
-    if (getAvailableBufferLength() < 2) {
-        size_t newSize;
-        if (in.allocatedSize >= Config.maxRequestBufferSize) {
+    if (buf.spaceSize() < 2) {
+        const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize();
+        if (haveCapacity >= Config.maxRequestBufferSize) {
             debugs(33, 4, "request buffer full: 
client_request_buffer_max_size=" << Config.maxRequestBufferSize);
             return false;
         }
-        if ((newSize=in.allocatedSize * 2) > Config.maxRequestBufferSize) {
-            newSize=Config.maxRequestBufferSize;
-        }
-        in.buf = (char *)memReallocBuf(in.buf, newSize, &in.allocatedSize);
-        debugs(33, 2, "growing request buffer: notYetUsed=" << in.notYetUsed 
<< " size=" << in.allocatedSize);
+        const SBuf::size_type wantCapacity = min(Config.maxRequestBufferSize, 
haveCapacity*2);
+        buf.reserveCapacity(wantCapacity);
+        debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() 
<< " used=" << buf.length());
     }
-    return true;
+    return (buf.spaceSize() >= 2);
 }
 
 void
@@ -2437,7 +2425,7 @@
         if (!ignoreErrno(xerrno)) {
             debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " 
<< xstrerr(xerrno));
             return 1;
-        } else if (in.notYetUsed == 0) {
+        } else if (in.buf.isEmpty()) {
             debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no 
data to process (" << xstrerr(xerrno) << ")");
         }
     }
@@ -2449,7 +2437,7 @@
 ConnStateData::connFinishedWithConn(int size)
 {
     if (size == 0) {
-        if (getConcurrentRequestCount() == 0 && in.notYetUsed == 0) {
+        if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) {
             /* no current or pending requests */
             debugs(33, 4, HERE << clientConnection << " closed");
             return 1;
@@ -2467,26 +2455,19 @@
 void
 connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount)
 {
-    assert(byteCount > 0 && byteCount <= conn->in.notYetUsed);
-    conn->in.notYetUsed -= byteCount;
-    debugs(33, 5, HERE << "conn->in.notYetUsed = " << conn->in.notYetUsed);
-    /*
-     * If there is still data that will be used,
-     * move it to the beginning.
-     */
-
-    if (conn->in.notYetUsed > 0)
-        memmove(conn->in.buf, conn->in.buf + byteCount, conn->in.notYetUsed);
+    assert(byteCount > 0 && byteCount <= conn->in.buf.length());
+    conn->in.buf.consume(byteCount);
+    debugs(33, 5, "conn->in.buf has " << conn->in.buf.length() << " bytes 
unused.");
 }
 
 /// respond with ERR_TOO_BIG if request header exceeds request_header_max_size
 void
 ConnStateData::checkHeaderLimits()
 {
-    if (in.notYetUsed < Config.maxRequestHeaderSize)
+    if (in.buf.length() < Config.maxRequestHeaderSize)
         return; // can accumulte more header data
 
-    debugs(33, 3, "Request header is too large (" << in.notYetUsed << " > " <<
+    debugs(33, 3, "Request header is too large (" << in.buf.length() << " > " 
<<
            Config.maxRequestHeaderSize << " bytes)");
 
     ClientSocketContext *context = parseHttpRequestAbort(this, 
"error:request-too-large");
@@ -2637,15 +2618,15 @@
         assert (repContext);
         switch (hp->request_parse_status) {
         case Http::scHeaderTooLarge:
-            repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, 
method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf, NULL);
+            repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, 
method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf.c_str(), 
NULL);
             break;
         case Http::scMethodNotAllowed:
             repContext->setReplyToError(ERR_UNSUP_REQ, 
Http::scMethodNotAllowed, method, http->uri,
-                                        conn->clientConnection->remote, NULL, 
conn->in.buf, NULL);
+                                        conn->clientConnection->remote, NULL, 
conn->in.buf.c_str(), NULL);
             break;
         default:
             repContext->setReplyToError(ERR_INVALID_REQ, 
hp->request_parse_status, method, http->uri,
-                                        conn->clientConnection->remote, NULL, 
conn->in.buf, NULL);
+                                        conn->clientConnection->remote, NULL, 
conn->in.buf.c_str(), NULL);
         }
         assert(context->http->out.offset == 0);
         context->pullData();
@@ -2895,9 +2876,9 @@
 static void
 connStripBufferWhitespace (ConnStateData * conn)
 {
-    while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) {
-        memmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1);
-        -- conn->in.notYetUsed;
+    // XXX: kill this whole function.
+    while (!conn->in.buf.isEmpty() && xisspace(conn->in.buf.at(0))) {
+        conn->in.buf.consume(1);
     }
 }
 
@@ -2940,24 +2921,20 @@
 
     // Loop while we have read bytes that are not needed for producing the body
     // On errors, bodyPipe may become nil, but readMore will be cleared
-    while (in.notYetUsed > 0 && !bodyPipe && flags.readMore) {
+    while (!in.buf.isEmpty() && !bodyPipe && flags.readMore) {
         connStripBufferWhitespace(this);
 
         /* Don't try to parse if the buffer is empty */
-        if (in.notYetUsed == 0)
+        if (in.buf.isEmpty())
             break;
 
         /* Limit the number of concurrent requests */
         if (concurrentRequestQueueFilled())
             break;
 
-        /* Should not be needed anymore */
-        /* Terminate the string */
-        in.buf[in.notYetUsed] = '\0';
-
         /* Begin the parsing */
         PROF_start(parseHttpRequest);
-        HttpParserInit(&parser_, in.buf, in.notYetUsed);
+        HttpParserInit(&parser_, in.buf.c_str(), in.buf.length());
 
         /* Process request */
         Http::ProtocolVersion http_ver;
@@ -3092,14 +3069,10 @@
 bool
 ConnStateData::handleReadData(char *buf, size_t size)
 {
-    char *current_buf = in.addressToReadInto();
-
-    if (buf != current_buf)
-        memmove(current_buf, buf, size);
-
-    in.notYetUsed += size;
-
-    in.buf[in.notYetUsed] = '\0'; /* Terminate the string */
+    // XXX: make this a no-op when buf given is the MemBlob free space.
+    assert(buf == in.buf.rawSpace(1));
+    assert(size <= in.buf.spaceSize());
+    in.buf.append(buf, size);
 
     // if we are reading a body, stuff data into the body pipe
     if (bodyPipe != NULL)
@@ -3128,7 +3101,7 @@
         }
     } else { // identity encoding
         debugs(33,5, HERE << "handling plain request body for " << 
clientConnection);
-        putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed);
+        putSize = bodyPipe->putMoreData(in.buf.c_str(), in.buf.length());
         if (!bodyPipe->mayNeedMoreData()) {
             // BodyPipe will clear us automagically when we produced everything
             bodyPipe = NULL;
@@ -3158,17 +3131,17 @@
 err_type
 ConnStateData::handleChunkedRequestBody(size_t &putSize)
 {
-    debugs(33,7, HERE << "chunked from " << clientConnection << ": " << 
in.notYetUsed);
+    debugs(33, 7, "chunked from " << clientConnection << ": " << 
in.buf.length());
 
     try { // the parser will throw on errors
 
-        if (!in.notYetUsed) // nothing to do (MemBuf::init requires this check)
+        if (in.buf.isEmpty()) // nothing to do
             return ERR_NONE;
 
         MemBuf raw; // ChunkedCodingParser only works with MemBufs
         // add one because MemBuf will assert if it cannot 0-terminate
-        raw.init(in.notYetUsed, in.notYetUsed+1);
-        raw.append(in.buf, in.notYetUsed);
+        raw.init(in.buf.length(), in.buf.length()+1);
+        raw.append(in.buf.c_str(), in.buf.length());
 
         const mb_size_t wasContentSize = raw.contentSize();
         BodyPipeCheckout bpc(*bodyPipe);
@@ -3308,7 +3281,7 @@
     log_addr = xact->tcpClient->remote;
     log_addr.applyMask(Config.Addrs.client_netmask);
 
-    in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &in.allocatedSize);
+    in.buf.reserveCapacity(CLIENT_REQ_BUF_SZ);
 
     if (port->disable_pmtu_discovery != DISABLE_PMTU_OFF &&
             (transparent() || port->disable_pmtu_discovery == 
DISABLE_PMTU_ALWAYS)) {
@@ -4345,7 +4318,7 @@
         return -1; // probably need to read more, but we cannot be sure
 
     const int64_t needToProduce = bodyPipe->unproducedSize();
-    const int64_t haveAvailable = static_cast<int64_t>(in.notYetUsed);
+    const int64_t haveAvailable = static_cast<int64_t>(in.buf.length());
 
     if (needToProduce <= haveAvailable)
         return 0; // we have read what we need (but are waiting for pipe space)
@@ -4415,20 +4388,13 @@
     in.bodyParser = NULL;
 }
 
-char *
-ConnStateData::In::addressToReadInto() const
-{
-    return buf + notYetUsed;
-}
-
-ConnStateData::In::In() : bodyParser(NULL),
-        buf (NULL), notYetUsed (0), allocatedSize (0)
+ConnStateData::In::In() :
+        bodyParser(NULL),
+        buf()
 {}
 
 ConnStateData::In::~In()
 {
-    if (allocatedSize)
-        memFreeBuf(allocatedSize, buf);
     delete bodyParser; // TODO: pool
 }
 

=== modified file 'src/client_side.h'
--- src/client_side.h   2014-01-05 19:49:23 +0000
+++ src/client_side.h   2014-03-08 00:30:18 +0000
@@ -189,14 +189,12 @@
     ~ConnStateData();
 
     void readSomeData();
-    int getAvailableBufferLength() const;
     bool areAllContextsForThisConnection() const;
     void freeAllContexts();
     void notifyAllContexts(const int xerrno); ///< tell everybody about the err
     /// Traffic parsing
     bool clientParseRequests();
     void readNextRequest();
-    bool maybeMakeSpaceAvailable();
     ClientSocketContext::Pointer getCurrentContext() const;
     void addContextToQueue(ClientSocketContext * context);
     int getConcurrentRequestCount() const;
@@ -212,12 +210,10 @@
     struct In {
         In();
         ~In();
-        char *addressToReadInto() const;
+        bool maybeMakeSpaceAvailable();
 
         ChunkedCodingParser *bodyParser; ///< parses chunked request body
-        char *buf;
-        size_t notYetUsed;
-        size_t allocatedSize;
+        SBuf buf;
     } in;
 
     /** number of body bytes we need to comm_read for the "current" request

=== modified file 'src/stat.cc'
--- src/stat.cc 2014-01-24 01:57:15 +0000
+++ src/stat.cc 2014-03-07 12:47:12 +0000
@@ -2023,8 +2023,8 @@
             storeAppendPrintf(s, "\tFD %d, read %" PRId64 ", wrote %" PRId64 
"\n", fd,
                               fd_table[fd].bytes_read, 
fd_table[fd].bytes_written);
             storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc);
-            storeAppendPrintf(s, "\tin: buf %p, offset %ld, size %ld\n",
-                              conn->in.buf, (long int) conn->in.notYetUsed, 
(long int) conn->in.allocatedSize);
+            storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n",
+                              conn->in.buf.c_str(), (long int) 
conn->in.buf.length(), (long int) conn->in.buf.spaceSize());
             storeAppendPrintf(s, "\tremote: %s\n",
                               
conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN));
             storeAppendPrintf(s, "\tlocal: %s\n",

=== modified file 'src/tests/stub_client_side.cc'
--- src/tests/stub_client_side.cc       2014-01-05 19:49:23 +0000
+++ src/tests/stub_client_side.cc       2014-03-08 00:32:24 +0000
@@ -29,19 +29,16 @@
 void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB
 
 void ConnStateData::readSomeData() STUB
-int ConnStateData::getAvailableBufferLength() const STUB_RETVAL(0)
 bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false)
 void ConnStateData::freeAllContexts() STUB
 void ConnStateData::notifyAllContexts(const int xerrno) STUB
 bool ConnStateData::clientParseRequests() STUB_RETVAL(false)
 void ConnStateData::readNextRequest() STUB
-bool ConnStateData::maybeMakeSpaceAvailable() STUB_RETVAL(false)
 void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB
 int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0)
 bool ConnStateData::isOpen() const STUB_RETVAL(false)
 void ConnStateData::checkHeaderLimits() STUB
 void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB
-char *ConnStateData::In::addressToReadInto() const STUB_RETVAL(NULL)
 int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0)
 #if USE_AUTH
 void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char 
*cause) STUB
@@ -76,6 +73,8 @@
 bool ConnStateData::serveDelayedError(ClientSocketContext *context) 
STUB_RETVAL(false)
 #endif
 
+bool ConnStateData::In::maybeMakeSpaceAvailable() STUB_RETVAL(false)
+
 void setLogUri(ClientHttpRequest * http, char const *uri, bool cleanUrl) STUB
 const char *findTrailingHTTPVersion(const char *uriAndHTTPVersion, const char 
*end) STUB_RETVAL(NULL)
 int varyEvaluateMatch(StoreEntry * entry, HttpRequest * req) STUB_RETVAL(0)

Reply via email to