Send chunked responses if body size is unknown.

Apply HTTP chunked transfer encoding to the response body if all of the following conditions are met:

* client claims HTTP version 1.1 or later support
* response does not have a Content-Length header already
* response does not use multipart/byteranges encoding
* connection is persistent

If we decide to send chunked reply, chunked_reply flag is set. Chunked encoding is done in ClientSocketContext::packChunk(). The last-chunk is sent only when clientReplyContext complete flag is set.

--------

This feature was requested to make Squid work with HTTP/1.1 clients that can handle chunked responses but cannot handle connection closures in the middle of a transaction sequence. The earlier version of the patch (for Squid v3.1) was tested in production.

N.B. A bug in Squid may result in server-side code not treating premature server-side connection termination as an error. That bug results in Squid client-side sending a complete chunked response to the client instead of omitting the last-chunk to indicate a truncated response. Fixing that bug is outside this project scope (but we might have a patch for it somewhere, I need to check).

Send chunked responses if body size is unknown.

Apply HTTP chunked transfer encoding to the response body if all of
the following conditions are met:

* client claims HTTP version 1.1 or later support
* response does not have a Content-Length header already
* response does not use multipart/byteranges encoding
* connection is persistent

If we decide to send chunked reply, chunked_reply flag is set. Chunked
encoding is done in ClientSocketContext::packChunk(). The last-chunk
is sent only when clientReplyContext complete flag is set.

=== modified file 'compat/types.h'
--- compat/types.h	2010-07-09 13:23:03 +0000
+++ compat/types.h	2010-08-19 18:25:20 +0000
@@ -99,37 +99,47 @@
 #ifndef PRId64
 #ifdef _SQUID_MSWIN_		/* Windows native port using MSVCRT */
 #define PRId64 "I64d"
 #elif SIZEOF_INT64_T > SIZEOF_LONG
 #define PRId64 "lld"
 #else
 #define PRId64 "ld"
 #endif
 #endif
 
 #ifndef PRIu64
 #ifdef _SQUID_MSWIN_		/* Windows native port using MSVCRT */
 #define PRIu64 "I64u"
 #elif SIZEOF_INT64_T > SIZEOF_LONG
 #define PRIu64 "llu"
 #else
 #define PRIu64 "lu"
 #endif
 #endif
 
+#ifndef PRIX64
+#ifdef _SQUID_MSWIN_		/* Windows native port using MSVCRT */
+#define PRIX64 "I64X"
+#elif SIZEOF_INT64_T > SIZEOF_LONG
+#define PRIX64 "llX"
+#else
+#define PRIX64 "lX"
+#endif
+#endif
+
 #ifndef HAVE_MODE_T
 typedef unsigned short mode_t;
 #endif
 
 #ifndef HAVE_FD_MASK
 typedef unsigned long fd_mask;
 #endif
 
 #ifndef HAVE_SOCKLEN_T
 typedef int socklen_t;
 #endif
 
 #ifndef HAVE_MTYP_T
 typedef long mtyp_t;
 #endif
 
 #endif /* SQUID_TYPES_H */

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2010-08-18 23:43:22 +0000
+++ src/client_side.cc	2010-08-20 03:50:08 +0000
@@ -875,62 +875,81 @@ ClientSocketContext::noteSentBodyBytes(s
     assert (http->range_iter.debt() >= -1);
 }
 
 bool
 ClientHttpRequest::multipartRangeRequest() const
 {
     return request->multipartRangeRequest();
 }
 
 bool
 ClientSocketContext::multipartRangeRequest() const
 {
     return http->multipartRangeRequest();
 }
 
 void
 ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
 {
     assert(rep == NULL);
 
-    if (!multipartRangeRequest()) {
+    if (!multipartRangeRequest() && !http->request->flags.chunked_reply) {
         size_t length = lengthToSend(bodyData.range());
         noteSentBodyBytes (length);
         AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
                                              CommIoCbPtrFun(clientWriteBodyComplete, this));
         comm_write(fd(), bodyData.data, length, call );
         return;
     }
 
     MemBuf mb;
     mb.init();
-    packRange(bodyData, &mb);
+    if (multipartRangeRequest())
+        packRange(bodyData, &mb);
+    else
+        packChunk(bodyData, mb);
 
     if (mb.contentSize()) {
         /* write */
         AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
                                              CommIoCbPtrFun(clientWriteComplete, this));
         comm_write_mbuf(fd(), &mb, call);
     }  else
         writeComplete(fd(), NULL, 0, COMM_OK);
 }
 
+/**
+ * Packs bodyData into mb using chunked encoding. Packs the last-chunk
+ * if bodyData is empty.
+ */
+void
+ClientSocketContext::packChunk(const StoreIOBuffer &bodyData, MemBuf &mb)
+{
+    const uint64_t length =
+        static_cast<uint64_t>(lengthToSend(bodyData.range()));
+    noteSentBodyBytes(length);
+
+    mb.Printf("%"PRIX64"\r\n", length);
+    mb.append(bodyData.data, length);
+    mb.Printf("\r\n");
+}
+
 /** put terminating boundary for multiparts */
 static void
 clientPackTermBound(String boundary, MemBuf * mb)
 {
     mb->Printf("\r\n--" SQUIDSTRINGPH "--\r\n", SQUIDSTRINGPRINT(boundary));
     debugs(33, 6, "clientPackTermBound: buf offset: " << mb->size);
 }
 
 /** appends a "part" HTTP header (as in a multi-part/range reply) to the buffer */
 static void
 clientPackRangeHdr(const HttpReply * rep, const HttpHdrRangeSpec * spec, String boundary, MemBuf * mb)
 {
     HttpHeader hdr(hoReply);
     Packer p;
     assert(rep);
     assert(spec);
 
     /* put boundary */
     debugs(33, 5, "clientPackRangeHdr: appending boundary: " << boundary);
     /* rfc2046 requires to _prepend_ boundary with <crlf>! */
@@ -1265,47 +1284,49 @@ ClientSocketContext::prepareReply(HttpRe
     reply = rep;
 
     if (http->request->range)
         buildRangeHeader(rep);
 }
 
 void
 ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData)
 {
     prepareReply(rep);
     assert (rep);
     MemBuf *mb = rep->pack();
     /* Save length of headers for persistent conn checks */
     http->out.headers_sz = mb->contentSize();
 #if HEADERS_LOG
 
     headersLog(0, 0, http->request->method, rep);
 #endif
 
     if (bodyData.data && bodyData.length) {
-        if (!multipartRangeRequest()) {
+        if (multipartRangeRequest())
+            packRange(bodyData, mb);
+        else if (http->request->flags.chunked_reply) {
+            packChunk(bodyData, *mb);
+        } else {
             size_t length = lengthToSend(bodyData.range());
             noteSentBodyBytes (length);
 
             mb->append(bodyData.data, length);
-        } else {
-            packRange(bodyData, mb);
         }
     }
 
     /* write */
     debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
     AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
                                          CommIoCbPtrFun(clientWriteComplete, this));
     comm_write_mbuf(fd(), mb, call);
 
     delete mb;
 }
 
 /**
  * Write a chunk of data to a client socket. If the reply is present,
  * send the reply headers down the wire too, and clean them up when
  * finished.
  * Pre-condition:
  *   The request is one backed by a connection, not an internal request.
  *   data context is not NULL
  *   There are no more entries in the stream chain.
@@ -1320,41 +1341,45 @@ clientSocketRecipient(clientStreamNode *
     PROF_start(clientSocketRecipient);
     /* TODO: handle this rather than asserting
      * - it should only ever happen if we cause an abort and
      * the callback chain loops back to here, so we can simply return.
      * However, that itself shouldn't happen, so it stays as an assert for now.
      */
     assert(cbdataReferenceValid(node));
     assert(node->node.next == NULL);
     ClientSocketContext::Pointer context = dynamic_cast<ClientSocketContext *>(node->data.getRaw());
     assert(context != NULL);
     assert(connIsUsable(http->getConn()));
     fd = http->getConn()->fd;
     /* TODO: check offset is what we asked for */
 
     if (context != http->getConn()->getCurrentContext()) {
         context->deferRecipientForLater(node, rep, receivedData);
         PROF_stop(clientSocketRecipient);
         return;
     }
 
-    if (responseFinishedOrFailed(rep, receivedData)) {
+    // After sending Transfer-Encoding: chunked (at least), always send
+    // the last-chunk if there was no error, ignoring responseFinishedOrFailed.
+    const bool mustSendLastChunk = http->request->flags.chunked_reply &&
+        !http->request->flags.stream_error && !context->startOfOutput();
+    if (responseFinishedOrFailed(rep, receivedData) && !mustSendLastChunk) {
         context->writeComplete(fd, NULL, 0, COMM_OK);
         PROF_stop(clientSocketRecipient);
         return;
     }
 
     if (!context->startOfOutput())
         context->sendBody(rep, receivedData);
     else {
         assert(rep);
         http->al.reply = HTTPMSGLOCK(rep);
         context->sendStartOfMessage(rep, receivedData);
     }
 
     PROF_stop(clientSocketRecipient);
 }
 
 /**
  * Called when a downstream node is no longer interested in
  * our data. As we are a terminal node, this means on aborts
  * only

=== modified file 'src/client_side.h'
--- src/client_side.h	2010-07-30 20:11:20 +0000
+++ src/client_side.h	2010-08-19 18:26:35 +0000
@@ -98,40 +98,41 @@ public:
     bool canPackMoreRanges() const;
     clientStream_status_t socketState();
     void sendBody(HttpReply * rep, StoreIOBuffer bodyData);
     void sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData);
     size_t lengthToSend(Range<int64_t> const &available);
     void noteSentBodyBytes(size_t);
     void buildRangeHeader(HttpReply * rep);
     int fd() const;
     clientStreamNode * getTail() const;
     clientStreamNode * getClientReplyContext() const;
     void connIsFinished();
     void removeFromConnectionList(ConnStateData * conn);
     void deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData);
     bool multipartRangeRequest() const;
     void registerWithConn();
     void noteIoError(const int xerrno); ///< update state to reflect I/O error
 
 private:
     CBDATA_CLASS(ClientSocketContext);
     void prepareReply(HttpReply * rep);
+    void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb);
     void packRange(StoreIOBuffer const &, MemBuf * mb);
     void deRegisterWithConn();
     void doClose();
     void initiateClose(const char *reason);
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
 };
 
 
 class ConnectionDetail;
 
 /** A connection to a socket */
 class ConnStateData : public BodyProducer/*, public RefCountable*/
 {
 
 public:
 
     ConnStateData();
     ~ConnStateData();
 

=== modified file 'src/client_side_reply.cc'
--- src/client_side_reply.cc	2010-08-14 02:58:39 +0000
+++ src/client_side_reply.cc	2010-08-20 04:12:18 +0000
@@ -958,40 +958,45 @@ clientReplyContext::traceReply(clientStr
     http->storeEntry()->complete();
 }
 
 #define SENDING_BODY 0
 #define SENDING_HDRSONLY 1
 int
 clientReplyContext::checkTransferDone()
 {
     StoreEntry *entry = http->storeEntry();
 
     if (entry == NULL)
         return 0;
 
     /*
      * For now, 'done_copying' is used for special cases like
      * Range and HEAD requests.
      */
     if (http->flags.done_copying)
         return 1;
 
+    if (http->request->flags.chunked_reply && !flags.complete) {
+        // last-chunk was not sent
+        return 0;
+    }
+
     /*
      * Handle STORE_OK objects.
      * objectLen(entry) will be set proprely.
      * RC: Does objectLen(entry) include the Headers?
      * RC: Yes.
      */
     if (entry->store_status == STORE_OK) {
         return storeOKTransferDone();
     } else {
         return storeNotOKTransferDone();
     }
 }
 
 int
 clientReplyContext::storeOKTransferDone() const
 {
     if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) {
         debugs(88,3,HERE << "storeOKTransferDone " <<
                " out.offset=" << http->out.offset <<
                " objectLen()=" << http->storeEntry()->objectLen() <<
@@ -1102,51 +1107,53 @@ clientReplyContext::replyStatus()
     /* Here because lower nodes don't need it */
 
     if (http->storeEntry() == NULL) {
         debugs(88, 5, "clientReplyStatus: no storeEntry");
         return STREAM_FAILED;	/* yuck, but what can we do? */
     }
 
     if (EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) {
         /* TODO: Could upstream read errors (result.flags.error) be
          * lost, and result in undersize requests being considered
          * complete. Should we tcp reset such connections ?
          */
         debugs(88, 5, "clientReplyStatus: aborted storeEntry");
         return STREAM_FAILED;
     }
 
     if ((done = checkTransferDone()) != 0 || flags.complete) {
         debugs(88, 5, "clientReplyStatus: transfer is DONE");
         /* Ok we're finished, but how? */
 
-        if (http->storeEntry()->getReply()->bodySize(http->request->method) < 0) {
+        const int64_t expectedBodySize =
+            http->storeEntry()->getReply()->bodySize(http->request->method);
+        if (!http->request->flags.proxy_keepalive && expectedBodySize < 0) {
             debugs(88, 5, "clientReplyStatus: closing, content_length < 0");
             return STREAM_FAILED;
         }
 
         if (!done) {
             debugs(88, 5, "clientReplyStatus: closing, !done, but read 0 bytes");
             return STREAM_FAILED;
         }
 
-        if (!http->gotEnough()) {
+        if (expectedBodySize >= 0 && !http->gotEnough()) {
             debugs(88, 5, "clientReplyStatus: client didn't get all it expected");
             return STREAM_UNPLANNED_COMPLETE;
         }
 
         if (http->request->flags.proxy_keepalive) {
             debugs(88, 5, "clientReplyStatus: stream complete and can keepalive");
             return STREAM_COMPLETE;
         }
 
         debugs(88, 5, "clientReplyStatus: stream was not expected to complete!");
         return STREAM_UNPLANNED_COMPLETE;
     }
 
     // XXX: Should this be checked earlier? We could return above w/o checking.
     if (reply->receivedBodyTooLarge(*http->request, http->out.offset - 4096)) {
         /* 4096 is a margin for the HTTP headers included in out.offset */
         debugs(88, 5, "clientReplyStatus: client reply body is too large");
         return STREAM_FAILED;
     }
 
@@ -1347,61 +1354,72 @@ clientReplyContext::buildReplyHeader()
          * depends on authenticate behaviour: all schemes to date send no extra
          * data on 407/401 responses, and do not check the accel state on 401/407
          * responses
          */
         authenticateFixHeader(reply, request->auth_user_request, request, 0, 1);
     } else if (request->auth_user_request != NULL)
         authenticateFixHeader(reply, request->auth_user_request, request, http->flags.accel, 0);
 
     /* Append X-Cache */
     httpHeaderPutStrf(hdr, HDR_X_CACHE, "%s from %s",
                       is_hit ? "HIT" : "MISS", getMyHostname());
 
 #if USE_CACHE_DIGESTS
     /* Append X-Cache-Lookup: -- temporary hack, to be removed @?@ @?@ */
     httpHeaderPutStrf(hdr, HDR_X_CACHE_LOOKUP, "%s from %s:%d",
                       lookup_type ? lookup_type : "NONE",
                       getMyHostname(), getMyPort());
 
 #endif
 
+    const bool maySendChunkedReply = !request->multipartRangeRequest() &&
+        (request->http_ver.major >= 1) && (request->http_ver.minor >= 1);
+
     /* Check whether we should send keep-alive */
     if (!Config.onoff.error_pconns && reply->sline.status >= 400 && !request->flags.must_keepalive) {
         debugs(33, 3, "clientBuildReplyHeader: Error, don't keep-alive");
         request->flags.proxy_keepalive = 0;
     } else if (!Config.onoff.client_pconns && !request->flags.must_keepalive) {
         debugs(33, 2, "clientBuildReplyHeader: Connection Keep-Alive not requested by admin or client");
         request->flags.proxy_keepalive = 0;
     } else if (request->flags.proxy_keepalive && shutting_down) {
         debugs(88, 3, "clientBuildReplyHeader: Shutting down, don't keep-alive.");
         request->flags.proxy_keepalive = 0;
     } else if (request->flags.connection_auth && !reply->keep_alive) {
         debugs(33, 2, "clientBuildReplyHeader: Connection oriented auth but server side non-persistent");
         request->flags.proxy_keepalive = 0;
-    } else if (reply->bodySize(request->method) < 0) {
+    } else if (reply->bodySize(request->method) < 0 && !maySendChunkedReply) {
         debugs(88, 3, "clientBuildReplyHeader: can't keep-alive, unknown body size" );
         request->flags.proxy_keepalive = 0;
     } else if (fdUsageHigh()&& !request->flags.must_keepalive) {
         debugs(88, 3, "clientBuildReplyHeader: Not many unused FDs, can't keep-alive");
         request->flags.proxy_keepalive = 0;
     }
 
+    // Decide if we send chunked reply
+    if (maySendChunkedReply &&
+        request->flags.proxy_keepalive &&
+        reply->bodySize(request->method) < 0) {
+        debugs(88, 3, "clientBuildReplyHeader: chunked reply");
+        request->flags.chunked_reply = 1;
+        hdr->putStr(HDR_TRANSFER_ENCODING, "chunked");
+    }
 
     /* Append VIA */
     if (Config.onoff.via) {
         LOCAL_ARRAY(char, bbuf, MAX_URL + 32);
         String strVia;
         hdr->getList(HDR_VIA, &strVia);
         snprintf(bbuf, MAX_URL + 32, "%d.%d %s",
                  reply->sline.version.major,
                  reply->sline.version.minor,
                  ThisCache);
         strListAdd(&strVia, bbuf, ',');
         hdr->delById(HDR_VIA);
         hdr->putStr(HDR_VIA, strVia.termedBuf());
     }
     /* Signal keep-alive or close explicitly */
     hdr->putStr(HDR_CONNECTION, request->flags.proxy_keepalive ? "keep-alive" : "close");
 
 #if ADD_X_REQUEST_URI
     /*
      * Knowing the URI of the request is useful when debugging persistent
@@ -1710,40 +1728,41 @@ clientReplyContext::makeThisHead()
 bool
 clientReplyContext::errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const
 {
     return /* aborted request */
         (http->storeEntry() && EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) ||
         /* Upstream read error */ (result.flags.error) ||
         /* Upstream EOF */ (sizeToProcess == 0);
 }
 
 void
 clientReplyContext::sendStreamError(StoreIOBuffer const &result)
 {
     /** call clientWriteComplete so the client socket gets closed
      *
      * We call into the stream, because we don't know that there is a
      * client socket!
      */
     debugs(88, 5, "clientReplyContext::sendStreamError: A stream error has occured, marking as complete and sending no data.");
     StoreIOBuffer localTempBuffer;
     flags.complete = 1;
+    http->request->flags.stream_error = 1;
     localTempBuffer.flags.error = result.flags.error;
     clientStreamCallback((clientStreamNode*)http->client_stream.head->data, http, NULL,
                          localTempBuffer);
 }
 
 void
 clientReplyContext::pushStreamData(StoreIOBuffer const &result, char *source)
 {
     StoreIOBuffer localTempBuffer;
 
     if (result.length == 0) {
         debugs(88, 5, "clientReplyContext::pushStreamData: marking request as complete due to 0 length store result");
         flags.complete = 1;
     }
 
     assert(result.offset - headers_sz == next()->readBuffer.offset);
     localTempBuffer.offset = result.offset - headers_sz;
     localTempBuffer.length = result.length;
 
     if (localTempBuffer.length)

=== modified file 'src/structs.h'
--- src/structs.h	2010-08-07 14:22:54 +0000
+++ src/structs.h	2010-08-20 03:43:41 +0000
@@ -986,78 +986,80 @@ struct _netdbEntry {
     int n_peers_alloc;
     int n_peers;
 };
 
 
 struct _iostats {
 
     struct {
         int reads;
         int reads_deferred;
         int read_hist[16];
         int writes;
         int write_hist[16];
     }
 
     Http, Ftp, Gopher;
 };
 
 
 struct request_flags {
-    request_flags(): range(0),nocache(0),ims(0),auth(0),cachable(0),hierarchical(0),loopdetect(0),proxy_keepalive(0),proxying(0),refresh(0),redirected(0),need_validation(0),accelerated(0),ignore_cc(0),intercepted(0),spoof_client_ip(0),internal(0),internalclient(0),must_keepalive(0),destinationIPLookedUp_(0) {
+    request_flags(): range(0),nocache(0),ims(0),auth(0),cachable(0),hierarchical(0),loopdetect(0),proxy_keepalive(0),proxying(0),refresh(0),redirected(0),need_validation(0),accelerated(0),ignore_cc(0),intercepted(0),spoof_client_ip(0),internal(0),internalclient(0),must_keepalive(0),chunked_reply(0),stream_error(0),destinationIPLookedUp_(0) {
 #if USE_HTTP_VIOLATIONS
         nocache_hack = 0;
 #endif
 #if FOLLOW_X_FORWARDED_FOR
         done_follow_x_forwarded_for = 0;
 #endif /* FOLLOW_X_FORWARDED_FOR */
     }
 
     unsigned int range:1;
     unsigned int nocache:1;
     unsigned int ims:1;
     unsigned int auth:1;
     unsigned int cachable:1;
     unsigned int hierarchical:1;
     unsigned int loopdetect:1;
     unsigned int proxy_keepalive:1;
 unsigned int proxying:
     1;	/* this should be killed, also in httpstateflags */
     unsigned int refresh:1;
     unsigned int redirected:1;
     unsigned int need_validation:1;
 #if USE_HTTP_VIOLATIONS
     unsigned int nocache_hack:1;	/* for changing/ignoring no-cache requests */
 #endif
     unsigned int accelerated:1;
     unsigned int ignore_cc:1;
     unsigned int intercepted:1;  /**< transparently intercepted request */
     unsigned int spoof_client_ip:1;  /**< spoof client ip if possible */
     unsigned int internal:1;
     unsigned int internalclient:1;
     unsigned int must_keepalive:1;
     unsigned int connection_auth:1; /** Request wants connection oriented auth */
     unsigned int connection_auth_disabled:1; /** Connection oriented auth can not be supported */
     unsigned int connection_proxy_auth:1; /** Request wants connection oriented auth */
     unsigned int pinned:1;      /* Request sent on a pinned connection */
     unsigned int auth_sent:1;   /* Authentication forwarded */
     unsigned int no_direct:1;	/* Deny direct forwarding unless overriden by always_direct. Used in accelerator mode */
+    unsigned int chunked_reply:1; /**< Reply with chunked transfer encoding */
+    unsigned int stream_error:1; /**< Whether stream error has occured */
 
     // When adding new flags, please update cloneAdaptationImmune() as needed.
 
     bool resetTCP() const;
     void setResetTCP();
     void clearResetTCP();
     void destinationIPLookupCompleted();
     bool destinationIPLookedUp() const;
 
     // returns a partial copy of the flags that includes only those flags
     // that are safe for a related (e.g., ICAP-adapted) request to inherit
     request_flags cloneAdaptationImmune() const;
 
 #if FOLLOW_X_FORWARDED_FOR
     unsigned int done_follow_x_forwarded_for;
 #endif /* FOLLOW_X_FORWARDED_FOR */
 private:
 
     unsigned int reset_tcp:1;
     unsigned int destinationIPLookedUp_:1;

Reply via email to