Author: rhuijben
Date: Wed Nov 18 17:43:23 2015
New Revision: 1715026
URL: http://svn.apache.org/viewvc?rev=1715026&view=rev
Log:
Use the pump to write data for outgoing connections, by wrapping the pump
writer with a helper that switches streams in a few nasty places.
* outgoing.c
(no_more_writes): Remove function.
(serf__connection_flush) Reimplement as wrapper around
serf_pump__write.
* pump.c
(serf_pump__write): Complete variable rename and fix using the wrong 'pump'
variable (line 296).
* serf_private.h
(serf__connection_flush): Use bool argument.
Modified:
serf/trunk/outgoing.c
serf/trunk/pump.c
serf/trunk/serf_private.h
Modified: serf/trunk/outgoing.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715026&r1=1715025&r2=1715026&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Wed Nov 18 17:43:23 2015
@@ -485,23 +485,6 @@ apr_status_t serf__open_connections(serf
return APR_SUCCESS;
}
-static apr_status_t no_more_writes(serf_connection_t *conn)
-{
- /* Note that we should hold new requests until we open our new socket. */
- conn->state = SERF_CONN_CLOSING;
- serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
- "stop writing on conn 0x%x\n", conn);
-
- /* Clear our iovec. */
- conn->pump.vec_len = 0;
-
- /* Update the pollset to know we don't want to write on this socket any
- * more.
- */
- serf_io__set_pollset_dirty(&conn->io);
- return APR_SUCCESS;
-}
-
/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
* the header contains value 'close' indicating the server is closing the
* connection right after this response.
@@ -650,137 +633,33 @@ static apr_status_t reset_connection(ser
return APR_SUCCESS;
}
-static apr_status_t socket_writev(serf_pump_t *conn)
-{
- apr_size_t written;
- apr_status_t status;
-
- status = apr_socket_sendv(conn->skt, conn->vec,
- conn->vec_len, &written);
- if (status && !APR_STATUS_IS_EAGAIN(status))
- serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
- "socket_sendv error %d\n", status);
-
- /* did we write everything? */
- if (written) {
- apr_size_t len = 0;
- int i;
-
- serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
- "--- socket_sendv: %d bytes. --\n", written);
-
- for (i = 0; i < conn->vec_len; i++) {
- len += conn->vec[i].iov_len;
- if (written < len) {
- serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
- "%.*s", conn->vec[i].iov_len - (len -
written),
- conn->vec[i].iov_base);
- if (i) {
- memmove(conn->vec, &conn->vec[i],
- sizeof(struct iovec) * (conn->vec_len - i));
- conn->vec_len -= i;
- }
- conn->vec[0].iov_base = (char *)conn->vec[0].iov_base +
(conn->vec[0].iov_len - (len - written));
- conn->vec[0].iov_len = len - written;
- break;
- } else {
- serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
- "%.*s",
- conn->vec[i].iov_len, conn->vec[i].iov_base);
- }
- }
- if (len == written) {
- conn->vec_len = 0;
- }
- serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n");
-
- /* Log progress information */
- serf__context_progress_delta(conn->io->ctx, 0, written);
- }
-
- return status;
-}
apr_status_t serf__connection_flush(serf_connection_t *conn,
- int pump)
+ bool fetch_new)
{
- apr_status_t status = APR_SUCCESS;
- apr_status_t read_status = APR_SUCCESS;
- serf_bucket_t *ostreamh = NULL;
-
- conn->pump.hit_eof = FALSE;
-
- while (status == APR_SUCCESS) {
-
- /* First try to write out what is already stored in the
- connection vecs. */
- while (conn->pump.vec_len && !status) {
- status = socket_writev(&conn->pump);
-
- /* If the write would have blocked, then we're done.
- * Don't try to write anything else to the socket.
- */
- if (APR_STATUS_IS_EPIPE(status)
- || APR_STATUS_IS_ECONNRESET(status)
- || APR_STATUS_IS_ECONNABORTED(status))
- return no_more_writes(conn);
- }
+ apr_status_t status;
+ serf_bucket_t *tmp_bkt = NULL;
- if (status || !pump)
- return status;
- else if (read_status || conn->pump.vec_len || conn->pump.hit_eof)
- return read_status;
+ if (fetch_new) {
+ serf_bucket_t *ostreamh, *ostreamt;
- /* Ok, with the vecs written, we can now refill the per connection
- output vecs */
- if (!ostreamh) {
- serf_bucket_t *ostreamt;
-
- status = prepare_conn_streams(conn, &ostreamt, &ostreamh);
- if (status)
- return status;
- }
+ status = prepare_conn_streams(conn, &ostreamt, &ostreamh);
+ if (status)
+ return status;
- /* ### optimize at some point by using read_for_sendfile */
- /* TODO: now that read_iovec will effectively try to return as much
- data as available, we probably don't want to read ALL_AVAIL, but
- a lower number, like the size of one or a few TCP packets, the
- available TCP buffer size ... */
- conn->pump.hit_eof = 0;
- read_status = serf_bucket_read_iovec(ostreamh,
- SERF_READ_ALL_AVAIL,
- IOV_MAX,
- conn->pump.vec,
- &conn->pump.vec_len);
-
- if (read_status == SERF_ERROR_WAIT_CONN) {
- /* The bucket told us that it can't provide more data until
- more data is read from the socket. This normally happens
- during a SSL handshake.
-
- We should avoid looking for writability for a while so
- that (hopefully) something will appear in the bucket so
- we can actually write something. otherwise, we could
- end up in a CPU spin: socket wants something, but we
- don't have anything (and keep returning EAGAIN) */
- conn->pump.stop_writing = 1;
- serf_io__set_pollset_dirty(&conn->io);
+ tmp_bkt = conn->pump.ostream_head;
+ conn->pump.ostream_head = ostreamh;
+ }
- read_status = APR_EAGAIN;
- }
- else if (APR_STATUS_IS_EAGAIN(read_status)) {
+ status = serf_pump__write(&conn->pump, fetch_new);
- /* We read some stuff, but did we read everything ? */
- if (conn->pump.hit_eof)
- read_status = APR_SUCCESS;
- }
- else if (SERF_BUCKET_READ_ERROR(read_status)) {
-
- /* Something bad happened. Propagate any errors. */
- return read_status;
- }
+ if (fetch_new) {
+ conn->pump.ostream_head = tmp_bkt;
}
+ if (conn->pump.done_writing)
+ conn->state = SERF_CONN_CLOSING;
+
return status;
}
Modified: serf/trunk/pump.c
URL:
http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715026&r1=1715025&r2=1715026&view=diff
==============================================================================
--- serf/trunk/pump.c (original)
+++ serf/trunk/pump.c Wed Nov 18 17:43:23 2015
@@ -274,16 +274,15 @@ apr_status_t serf_pump__write(serf_pump_
{
apr_status_t status = APR_SUCCESS;
apr_status_t read_status = APR_SUCCESS;
- serf_pump_t *const conn = pump;
- conn->hit_eof = FALSE;
+ pump->hit_eof = FALSE;
while (status == APR_SUCCESS) {
/* First try to write out what is already stored in the
connection vecs. */
- while (conn->vec_len && !status) {
- status = socket_writev(conn);
+ while (pump->vec_len && !status) {
+ status = socket_writev(pump);
/* If the write would have blocked, then we're done.
* Don't try to write anything else to the socket.
@@ -291,12 +290,12 @@ apr_status_t serf_pump__write(serf_pump_
if (APR_STATUS_IS_EPIPE(status)
|| APR_STATUS_IS_ECONNRESET(status)
|| APR_STATUS_IS_ECONNABORTED(status))
- return no_more_writes(conn);
+ return no_more_writes(pump);
}
- if (status || !pump)
+ if (status || !fetch_new)
return status;
- else if (read_status || conn->vec_len || conn->hit_eof)
+ else if (read_status || pump->vec_len || pump->hit_eof)
return read_status;
/* ### optimize at some point by using read_for_sendfile */
@@ -304,12 +303,12 @@ apr_status_t serf_pump__write(serf_pump_
data as available, we probably don't want to read ALL_AVAIL, but
a lower number, like the size of one or a few TCP packets, the
available TCP buffer size ... */
- conn->hit_eof = 0;
+ pump->hit_eof = false;
read_status = serf_bucket_read_iovec(pump->ostream_head,
SERF_READ_ALL_AVAIL,
IOV_MAX,
- conn->vec,
- &conn->vec_len);
+ pump->vec,
+ &pump->vec_len);
if (read_status == SERF_ERROR_WAIT_CONN) {
/* The bucket told us that it can't provide more data until
@@ -321,15 +320,15 @@ apr_status_t serf_pump__write(serf_pump_
we can actually write something. otherwise, we could
end up in a CPU spin: socket wants something, but we
don't have anything (and keep returning EAGAIN) */
- conn->stop_writing = true;
- serf_io__set_pollset_dirty(conn->io);
+ pump->stop_writing = true;
+ serf_io__set_pollset_dirty(pump->io);
read_status = APR_EAGAIN;
}
else if (APR_STATUS_IS_EAGAIN(read_status)) {
/* We read some stuff, but did we read everything ? */
- if (conn->hit_eof)
+ if (pump->hit_eof)
read_status = APR_SUCCESS;
}
else if (SERF_BUCKET_READ_ERROR(read_status)) {
@@ -341,4 +340,3 @@ apr_status_t serf_pump__write(serf_pump_
return status;
}
-
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715026&r1=1715025&r2=1715026&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 17:43:23 2015
@@ -682,7 +682,7 @@ serf_request_t *serf__ssltunnel_request_
void *setup_baton);
void serf__connection_set_pipelining(serf_connection_t *conn, int enabled);
apr_status_t serf__connection_flush(serf_connection_t *conn,
- int pump);
+ bool fetch_new);
apr_status_t serf__provide_credentials(serf_context_t *ctx,
char **username,