Prepare the way to efficiently parse client requests using SBuf based parser-ng.
We rely on several guarantees for this to work: * rawSpace(N) guarantees that the backing store is both unique to this SBuf and has at least N bytes of unused buffer allocated. * SBuf member retains an active reference to the MemBlob backing store for the duration of the read operation. Except in cases whete ConnStateData gets destructed, in which case ConnStateData cancels the read explicitly. * comm_read() is effectively a blocking operation with regards to read on the client socket. No other read is scheduled until the callback handler has been run. * parsing and other manipulations of the buffer are done synchronously by the read handler before scheduling more reads. FWIW: this code has had one days testing under real-world traffic from a few heavy users with good results. Amos
=== modified file 'src/client_side.cc' --- src/client_side.cc 2014-02-21 10:46:19 +0000 +++ src/client_side.cc 2014-03-08 00:29:42 +0000 @@ -248,12 +248,12 @@ debugs(33, 4, HERE << clientConnection << ": reading request..."); - if (!maybeMakeSpaceAvailable()) + if (!in.maybeMakeSpaceAvailable()) return; typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer; reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - comm_read(clientConnection, in.addressToReadInto(), getAvailableBufferLength(), reader); + comm_read(clientConnection, in.buf.rawSpace(in.buf.spaceSize()), in.buf.spaceSize()-1, reader); } void @@ -1884,7 +1884,7 @@ if (!stoppedReceiving()) { if (const int64_t expecting = mayNeedToReadMoreBody()) { debugs(33, 5, HERE << "must still read " << expecting << - " request body bytes with " << in.notYetUsed << " unused"); + " request body bytes with " << in.buf.length() << " unused"); return; // wait for the request receiver to finish reading } } @@ -1951,7 +1951,7 @@ ClientSocketContext *context; StoreIOBuffer tempBuffer; http = new ClientHttpRequest(csd); - http->req_sz = csd->in.notYetUsed; + http->req_sz = csd->in.buf.length(); http->uri = xstrdup(uri); setLogUri (http, uri); context = new ClientSocketContext(csd->clientConnection, http); @@ -2374,32 +2374,20 @@ return result; } -int -ConnStateData::getAvailableBufferLength() const -{ - assert (in.allocatedSize > in.notYetUsed); // allocated more than used - const size_t result = in.allocatedSize - in.notYetUsed - 1; - // huge request_header_max_size may lead to more than INT_MAX unused space - assert (static_cast<ssize_t>(result) <= INT_MAX); - return result; -} - bool -ConnStateData::maybeMakeSpaceAvailable() +ConnStateData::In::maybeMakeSpaceAvailable() { - if (getAvailableBufferLength() < 2) { - size_t newSize; - if (in.allocatedSize >= Config.maxRequestBufferSize) { + if (buf.spaceSize() < 2) { + const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize(); + if (haveCapacity >= Config.maxRequestBufferSize) { debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize); return false; } - if ((newSize=in.allocatedSize * 2) > Config.maxRequestBufferSize) { - newSize=Config.maxRequestBufferSize; - } - in.buf = (char *)memReallocBuf(in.buf, newSize, &in.allocatedSize); - debugs(33, 2, "growing request buffer: notYetUsed=" << in.notYetUsed << " size=" << in.allocatedSize); + const SBuf::size_type wantCapacity = min(Config.maxRequestBufferSize, haveCapacity*2); + buf.reserveCapacity(wantCapacity); + debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() << " used=" << buf.length()); } - return true; + return (buf.spaceSize() >= 2); } void @@ -2437,7 +2425,7 @@ if (!ignoreErrno(xerrno)) { debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno)); return 1; - } else if (in.notYetUsed == 0) { + } else if (in.buf.isEmpty()) { debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")"); } } @@ -2449,7 +2437,7 @@ ConnStateData::connFinishedWithConn(int size) { if (size == 0) { - if (getConcurrentRequestCount() == 0 && in.notYetUsed == 0) { + if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) { /* no current or pending requests */ debugs(33, 4, HERE << clientConnection << " closed"); return 1; @@ -2467,26 +2455,19 @@ void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount) { - assert(byteCount > 0 && byteCount <= conn->in.notYetUsed); - conn->in.notYetUsed -= byteCount; - debugs(33, 5, HERE << "conn->in.notYetUsed = " << conn->in.notYetUsed); - /* - * If there is still data that will be used, - * move it to the beginning. - */ - - if (conn->in.notYetUsed > 0) - memmove(conn->in.buf, conn->in.buf + byteCount, conn->in.notYetUsed); + assert(byteCount > 0 && byteCount <= conn->in.buf.length()); + conn->in.buf.consume(byteCount); + debugs(33, 5, "conn->in.buf has " << conn->in.buf.length() << " bytes unused."); } /// respond with ERR_TOO_BIG if request header exceeds request_header_max_size void ConnStateData::checkHeaderLimits() { - if (in.notYetUsed < Config.maxRequestHeaderSize) + if (in.buf.length() < Config.maxRequestHeaderSize) return; // can accumulte more header data - debugs(33, 3, "Request header is too large (" << in.notYetUsed << " > " << + debugs(33, 3, "Request header is too large (" << in.buf.length() << " > " << Config.maxRequestHeaderSize << " bytes)"); ClientSocketContext *context = parseHttpRequestAbort(this, "error:request-too-large"); @@ -2637,15 +2618,15 @@ assert (repContext); switch (hp->request_parse_status) { case Http::scHeaderTooLarge: - repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf, NULL); + repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL); break; case Http::scMethodNotAllowed: repContext->setReplyToError(ERR_UNSUP_REQ, Http::scMethodNotAllowed, method, http->uri, - conn->clientConnection->remote, NULL, conn->in.buf, NULL); + conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL); break; default: repContext->setReplyToError(ERR_INVALID_REQ, hp->request_parse_status, method, http->uri, - conn->clientConnection->remote, NULL, conn->in.buf, NULL); + conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL); } assert(context->http->out.offset == 0); context->pullData(); @@ -2895,9 +2876,9 @@ static void connStripBufferWhitespace (ConnStateData * conn) { - while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { - memmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); - -- conn->in.notYetUsed; + // XXX: kill this whole function. + while (!conn->in.buf.isEmpty() && xisspace(conn->in.buf.at(0))) { + conn->in.buf.consume(1); } } @@ -2940,24 +2921,20 @@ // Loop while we have read bytes that are not needed for producing the body // On errors, bodyPipe may become nil, but readMore will be cleared - while (in.notYetUsed > 0 && !bodyPipe && flags.readMore) { + while (!in.buf.isEmpty() && !bodyPipe && flags.readMore) { connStripBufferWhitespace(this); /* Don't try to parse if the buffer is empty */ - if (in.notYetUsed == 0) + if (in.buf.isEmpty()) break; /* Limit the number of concurrent requests */ if (concurrentRequestQueueFilled()) break; - /* Should not be needed anymore */ - /* Terminate the string */ - in.buf[in.notYetUsed] = '\0'; - /* Begin the parsing */ PROF_start(parseHttpRequest); - HttpParserInit(&parser_, in.buf, in.notYetUsed); + HttpParserInit(&parser_, in.buf.c_str(), in.buf.length()); /* Process request */ Http::ProtocolVersion http_ver; @@ -3092,14 +3069,10 @@ bool ConnStateData::handleReadData(char *buf, size_t size) { - char *current_buf = in.addressToReadInto(); - - if (buf != current_buf) - memmove(current_buf, buf, size); - - in.notYetUsed += size; - - in.buf[in.notYetUsed] = '\0'; /* Terminate the string */ + // XXX: make this a no-op when buf given is the MemBlob free space. + assert(buf == in.buf.rawSpace(1)); + assert(size <= in.buf.spaceSize()); + in.buf.append(buf, size); // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) @@ -3128,7 +3101,7 @@ } } else { // identity encoding debugs(33,5, HERE << "handling plain request body for " << clientConnection); - putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed); + putSize = bodyPipe->putMoreData(in.buf.c_str(), in.buf.length()); if (!bodyPipe->mayNeedMoreData()) { // BodyPipe will clear us automagically when we produced everything bodyPipe = NULL; @@ -3158,17 +3131,17 @@ err_type ConnStateData::handleChunkedRequestBody(size_t &putSize) { - debugs(33,7, HERE << "chunked from " << clientConnection << ": " << in.notYetUsed); + debugs(33, 7, "chunked from " << clientConnection << ": " << in.buf.length()); try { // the parser will throw on errors - if (!in.notYetUsed) // nothing to do (MemBuf::init requires this check) + if (in.buf.isEmpty()) // nothing to do return ERR_NONE; MemBuf raw; // ChunkedCodingParser only works with MemBufs // add one because MemBuf will assert if it cannot 0-terminate - raw.init(in.notYetUsed, in.notYetUsed+1); - raw.append(in.buf, in.notYetUsed); + raw.init(in.buf.length(), in.buf.length()+1); + raw.append(in.buf.c_str(), in.buf.length()); const mb_size_t wasContentSize = raw.contentSize(); BodyPipeCheckout bpc(*bodyPipe); @@ -3308,7 +3281,7 @@ log_addr = xact->tcpClient->remote; log_addr.applyMask(Config.Addrs.client_netmask); - in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &in.allocatedSize); + in.buf.reserveCapacity(CLIENT_REQ_BUF_SZ); if (port->disable_pmtu_discovery != DISABLE_PMTU_OFF && (transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) { @@ -4345,7 +4318,7 @@ return -1; // probably need to read more, but we cannot be sure const int64_t needToProduce = bodyPipe->unproducedSize(); - const int64_t haveAvailable = static_cast<int64_t>(in.notYetUsed); + const int64_t haveAvailable = static_cast<int64_t>(in.buf.length()); if (needToProduce <= haveAvailable) return 0; // we have read what we need (but are waiting for pipe space) @@ -4415,20 +4388,13 @@ in.bodyParser = NULL; } -char * -ConnStateData::In::addressToReadInto() const -{ - return buf + notYetUsed; -} - -ConnStateData::In::In() : bodyParser(NULL), - buf (NULL), notYetUsed (0), allocatedSize (0) +ConnStateData::In::In() : + bodyParser(NULL), + buf() {} ConnStateData::In::~In() { - if (allocatedSize) - memFreeBuf(allocatedSize, buf); delete bodyParser; // TODO: pool } === modified file 'src/client_side.h' --- src/client_side.h 2014-01-05 19:49:23 +0000 +++ src/client_side.h 2014-03-08 00:30:18 +0000 @@ -189,14 +189,12 @@ ~ConnStateData(); void readSomeData(); - int getAvailableBufferLength() const; bool areAllContextsForThisConnection() const; void freeAllContexts(); void notifyAllContexts(const int xerrno); ///< tell everybody about the err /// Traffic parsing bool clientParseRequests(); void readNextRequest(); - bool maybeMakeSpaceAvailable(); ClientSocketContext::Pointer getCurrentContext() const; void addContextToQueue(ClientSocketContext * context); int getConcurrentRequestCount() const; @@ -212,12 +210,10 @@ struct In { In(); ~In(); - char *addressToReadInto() const; + bool maybeMakeSpaceAvailable(); ChunkedCodingParser *bodyParser; ///< parses chunked request body - char *buf; - size_t notYetUsed; - size_t allocatedSize; + SBuf buf; } in; /** number of body bytes we need to comm_read for the "current" request === modified file 'src/stat.cc' --- src/stat.cc 2014-01-24 01:57:15 +0000 +++ src/stat.cc 2014-03-07 12:47:12 +0000 @@ -2023,8 +2023,8 @@ storeAppendPrintf(s, "\tFD %d, read %" PRId64 ", wrote %" PRId64 "\n", fd, fd_table[fd].bytes_read, fd_table[fd].bytes_written); storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc); - storeAppendPrintf(s, "\tin: buf %p, offset %ld, size %ld\n", - conn->in.buf, (long int) conn->in.notYetUsed, (long int) conn->in.allocatedSize); + storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n", + conn->in.buf.c_str(), (long int) conn->in.buf.length(), (long int) conn->in.buf.spaceSize()); storeAppendPrintf(s, "\tremote: %s\n", conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN)); storeAppendPrintf(s, "\tlocal: %s\n", === modified file 'src/tests/stub_client_side.cc' --- src/tests/stub_client_side.cc 2014-01-05 19:49:23 +0000 +++ src/tests/stub_client_side.cc 2014-03-08 00:32:24 +0000 @@ -29,19 +29,16 @@ void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB void ConnStateData::readSomeData() STUB -int ConnStateData::getAvailableBufferLength() const STUB_RETVAL(0) bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false) void ConnStateData::freeAllContexts() STUB void ConnStateData::notifyAllContexts(const int xerrno) STUB bool ConnStateData::clientParseRequests() STUB_RETVAL(false) void ConnStateData::readNextRequest() STUB -bool ConnStateData::maybeMakeSpaceAvailable() STUB_RETVAL(false) void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0) bool ConnStateData::isOpen() const STUB_RETVAL(false) void ConnStateData::checkHeaderLimits() STUB void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB -char *ConnStateData::In::addressToReadInto() const STUB_RETVAL(NULL) int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0) #if USE_AUTH void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB @@ -76,6 +73,8 @@ bool ConnStateData::serveDelayedError(ClientSocketContext *context) STUB_RETVAL(false) #endif +bool ConnStateData::In::maybeMakeSpaceAvailable() STUB_RETVAL(false) + void setLogUri(ClientHttpRequest * http, char const *uri, bool cleanUrl) STUB const char *findTrailingHTTPVersion(const char *uriAndHTTPVersion, const char *end) STUB_RETVAL(NULL) int varyEvaluateMatch(StoreEntry * entry, HttpRequest * req) STUB_RETVAL(0)