Author: rhuijben
Date: Mon Nov 23 17:03:04 2015
New Revision: 1715887
URL: http://svn.apache.org/viewvc?rev=1715887&view=rev
Log:
* incoming.c
On the investigate branch: switch back to the old writing code, from
before introducing the pump.
Modified:
serf/branches/pump-investigate/incoming.c
Modified: serf/branches/pump-investigate/incoming.c
URL:
http://svn.apache.org/viewvc/serf/branches/pump-investigate/incoming.c?rev=1715887&r1=1715886&r2=1715887&view=diff
==============================================================================
--- serf/branches/pump-investigate/incoming.c (original)
+++ serf/branches/pump-investigate/incoming.c Mon Nov 23 17:03:04 2015
@@ -350,10 +350,144 @@ static apr_status_t read_from_client(ser
return status;
}
+static apr_status_t socket_writev(serf_incoming_t *client)
+{
+ apr_size_t written;
+ apr_status_t status;
+
+ status = apr_socket_sendv(client->skt, client->vec,
+ client->vec_len, &written);
+ if (status && !APR_STATUS_IS_EAGAIN(status))
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+ "socket_sendv error %d\n", status);
+
+ /* did we write everything? */
+ if (written) {
+ apr_size_t len = 0;
+ int i;
+
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+ "--- socket_sendv: %d bytes. --\n", written);
+
+ for (i = 0; i < client->vec_len; i++) {
+ len += client->vec[i].iov_len;
+ if (written < len) {
+ serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config,
+ "%.*s", client->vec[i].iov_len - (len -
written),
+ client->vec[i].iov_base);
+ if (i) {
+ memmove(client->vec, &client->vec[i],
+ sizeof(struct iovec) * (client->vec_len - i));
+ client->vec_len -= i;
+ }
+ client->vec[0].iov_base = (char *)client->vec[0].iov_base +
(client->vec[0].iov_len - (len - written));
+ client->vec[0].iov_len = len - written;
+ break;
+ } else {
+ serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config,
+ "%.*s",
+ client->vec[i].iov_len,
client->vec[i].iov_base);
+ }
+ }
+ if (len == written) {
+ client->vec_len = 0;
+ }
+ serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client->config, "\n");
+
+ /* Log progress information */
+ serf__context_progress_delta(client->ctx, 0, written);
+ }
+
+ return status;
+}
+
+static apr_status_t no_more_writes(serf_incoming_t *client)
+{
+ /* Note that we should hold new requests until we open our new socket. */
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
+ "stop writing on client 0x%x\n", client);
+
+ /* Clear our iovec. */
+ client->vec_len = 0;
+
+ /* Update the pollset to know we don't want to write on this socket any
+ * more.
+ */
+ serf_io__set_pollset_dirty(&client->io);
+ return APR_SUCCESS;
+}
+
apr_status_t serf__incoming_client_flush(serf_incoming_t *client,
bool pump)
{
- return serf_pump__write(&client->pump, pump);
+ apr_status_t status = APR_SUCCESS;
+ apr_status_t read_status = APR_SUCCESS;
+ serf_bucket_t *ostreamh = client->pump.ostream_head;
+
+ client->pump.hit_eof = FALSE;
+
+ while (status == APR_SUCCESS) {
+
+ /* First try to write out what is already stored in the
+ connection vecs. */
+ while (client->vec_len && !status) {
+ status = socket_writev(client);
+
+ /* If the write would have blocked, then we're done.
+ * Don't try to write anything else to the socket.
+ */
+ if (APR_STATUS_IS_EPIPE(status)
+ || APR_STATUS_IS_ECONNRESET(status)
+ || APR_STATUS_IS_ECONNABORTED(status))
+ return no_more_writes(client);
+ }
+
+ if (status || !pump)
+ return status;
+ else if (read_status || client->vec_len || client->pump.hit_eof)
+ return read_status;
+
+ /* ### optimize at some point by using read_for_sendfile */
+ /* TODO: now that read_iovec will effectively try to return as much
+ data as available, we probably don't want to read ALL_AVAIL, but
+ a lower number, like the size of one or a few TCP packets, the
+ available TCP buffer size ... */
+ client->pump.hit_eof = 0;
+ read_status = serf_bucket_read_iovec(ostreamh,
+ SERF_READ_ALL_AVAIL,
+ IOV_MAX,
+ client->vec,
+ &client->vec_len);
+
+ if (read_status == SERF_ERROR_WAIT_CONN) {
+ /* The bucket told us that it can't provide more data until
+ more data is read from the socket. This normally happens
+ during a SSL handshake.
+
+ We should avoid looking for writability for a while so
+ that (hopefully) something will appear in the bucket so
+ we can actually write something. otherwise, we could
+ end up in a CPU spin: socket wants something, but we
+ don't have anything (and keep returning EAGAIN) */
+ client->pump.stop_writing = true;
+ serf_io__set_pollset_dirty(&client->io);
+
+ read_status = APR_EAGAIN;
+ }
+ else if (APR_STATUS_IS_EAGAIN(read_status)) {
+
+ /* We read some stuff, but did we read everything ? */
+ if (client->pump.hit_eof)
+ read_status = APR_SUCCESS;
+ }
+ else if (SERF_BUCKET_READ_ERROR(read_status)) {
+
+ /* Something bad happened. Propagate any errors. */
+ return read_status;
+ }
+ }
+
+ return status;
}
static apr_status_t write_to_client(serf_incoming_t *client)
@@ -387,7 +521,7 @@ void serf_incoming_set_framing_type(
if (client->skt) {
serf_io__set_pollset_dirty(&client->io);
- client->pump.stop_writing = false;
+ client->pump.stop_writing = 0;
/* Close down existing protocol */
if (client->protocol_baton && client->perform_teardown) {
@@ -714,7 +848,6 @@ apr_status_t serf__incoming_update_polls
}
client->ctx->incomings->nelts--;
apr_pool_destroy(client->pool);
-#endif
if (cid >= ctx->incomings->nelts) {
/* We skipped updating the pollset on this item as we moved it.
@@ -722,6 +855,7 @@ apr_status_t serf__incoming_update_polls
return serf__incoming_update_pollset(GET_INCOMING(ctx, cid));
}
+#endif
return APR_SUCCESS;
}
@@ -752,8 +886,38 @@ apr_status_t serf__incoming_update_polls
But it also has the nice side effect of removing references
from the aggregate to requests that are done.
*/
+ if (client->vec_len) {
+ /* We still have vecs in the connection, which lifetime is
+ managed by buckets inside client->ostream_head.
- data_waiting = serf_pump__data_pending(&client->pump);
+ Don't touch ostream as that might destroy the vecs */
+
+ data_waiting = true;
+ }
+ else {
+ serf_bucket_t *ostream;
+
+ ostream = client->pump.ostream_head;
+
+ if (ostream) {
+ const char *dummy_data;
+ apr_size_t len;
+
+ status = serf_bucket_peek(ostream, &dummy_data, &len);
+
+ if (SERF_BUCKET_READ_ERROR(status) || len > 0) {
+ /* DATA or error waiting */
+ data_waiting = TRUE; /* Error waiting */
+ }
+ else if (! status || APR_STATUS_IS_EOF(status)) {
+ data_waiting = FALSE;
+ }
+ else
+ data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */
+ }
+ else
+ data_waiting = FALSE;
+ }
if (data_waiting) {
desc.reqevents |= APR_POLLOUT;