If anybody can spot a possible cause of a bus error on the MAC bots in this,
please let me know.
I'm unable to reproduce anything like this problem on Windows, FreeBSD,
OpenSUSE or Debian.
But all 4 mac bots produce a bus error since this release.
(I wish I knew about this earlier, but the bot was down until last night)
Bert
> -----Original Message-----
> From: [email protected] [mailto:[email protected]]
> Sent: woensdag 18 november 2015 15:46
> To: [email protected]
> Subject: svn commit: r1715005 - in /serf/trunk: incoming.c
> protocols/fcgi_protocol.c protocols/http2_protocol.c pump.c serf_private.h
>
> Author: rhuijben
> Date: Wed Nov 18 14:45:58 2015
> New Revision: 1715005
>
> URL: http://svn.apache.org/viewvc?rev=1715005&view=rev
> Log:
> Introduce a 'pump' layer that contains the stream pumping logic that was
> originally only part of outgoing.c, but is now partially duplicated in
> incoming.c.
>
> The implementation is currently directly copied (with svn history) from
> the outgoing connection, but this patch only uses it for the incoming
> connections yet.
>
> * io.c
> New file copied from outgoing.c. Removing parts that are not necessary
> and making things serf private where needed.
This was committed as pump.c. Propedited later.
>
> * incoming.c
> (client_detect_eof): Remove function.
> (client_connected): Use several pump functions to avoid duplicated code.
> (http1_enqueue_reponse,
> perform_peek_protocol,
> read_from_client): Update usage.
> (socket_writev,
> no_more_writes): Remove functions.
> (serf__incoming_client_flush): Replace implementation with call to
> serf_pump__write().
> (serf_incoming_set_framing_type): Update usage.
> (serf_incoming_create2): Init pump.
> (serf__incoming_update_pollset): Use data pending helper.
>
> * protocols/fcgi_protocol.c
> (fcgi_server_read,
> fcgi_server_write,
> fcgi_server_teardown): Update usage.
>
> * protocols/http2_protocol.c
> (serf__http2_protocol_init_server,
> http2_incoming_read): Update usage.
>
> * pump.c
> (pump_cleanup): New function.
> (serf_pump__init): New function.
>
> (data_pending): Turn into...
> (serf_pump__data_pending): ... this.
>
> (detect_eof): Use extended baton. Return final EOF.
>
> (do_conn_setup): Split into...
> (serf_pump__prepare_setup): ... this and
> (serf_pump__complete_setup): ... this.
>
> (serf__connection_flush): Tweak to...
> (serf_pump__write): ... this.
>
> * serf_private.h
> (serf_pump_io_t): New struct.
>
> Added:
> serf/trunk/pump.c
> - copied, changed from r1714992, serf/trunk/outgoing.c
> Modified:
> serf/trunk/incoming.c
> serf/trunk/protocols/fcgi_protocol.c
> serf/trunk/protocols/http2_protocol.c
> serf/trunk/serf_private.h
>
> Modified: serf/trunk/incoming.c
> URL:
> http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1715005&r1=1715
> 004&r2=1715005&view=diff
> ==========================================================
> ====================
> --- serf/trunk/incoming.c (original)
> +++ serf/trunk/incoming.c Wed Nov 18 14:45:58 2015
> @@ -28,76 +28,35 @@
>
> #include "serf_private.h"
>
> -static apr_status_t client_detect_eof(void *baton,
> - serf_bucket_t *aggregator)
> -{
> - serf_incoming_t *client = baton;
> - client->hit_eof = true;
> - return APR_EAGAIN;
> -}
> -
> static apr_status_t client_connected(serf_incoming_t *client)
> {
> /* serf_context_t *ctx = client->ctx; */
> apr_status_t status;
> serf_bucket_t *ostream;
> - apr_sockaddr_t *sa;
>
> - if (apr_socket_addr_get(&sa, APR_LOCAL, client->skt) == APR_SUCCESS) {
> - char buf[48];
> - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
> - serf_config_set_stringf(client->config,
> SERF_CONFIG_CONN_LOCALIP,
> - "%s:%d", buf, sa->port);
> - }
> - if (apr_socket_addr_get(&sa, APR_REMOTE, client->skt) ==
> APR_SUCCESS) {
> - char buf[48];
> - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
> - serf_config_set_stringf(client->config,
> SERF_CONFIG_CONN_REMOTEIP,
> - "%s:%d", buf, sa->port);
> - }
> + serf_pump__store_ipaddresses_in_config(&client->pump);
>
> serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
> "socket for client 0x%x connected\n", client);
>
> /* ### Connection does auth setup here */
>
> - if (client->ostream_head == NULL) {
> - client->ostream_head = serf_bucket_aggregate_create(client-
> >allocator);
> - }
> -
> - if (client->ostream_tail == NULL) {
> - client->ostream_tail =
> serf_bucket_aggregate_create(client->allocator);
> -
> - serf_bucket_aggregate_hold_open(client->ostream_tail,
> - client_detect_eof, client);
> - }
> + serf_pump__prepare_setup(&client->pump);
>
> - ostream = client->ostream_tail;
> + ostream = client->pump.ostream_tail;
>
> status = client->setup(client->skt,
> - &client->stream,
> + &client->pump.stream,
> &ostream,
> client->setup_baton, client->pool);
>
> if (status) {
> - /* extra destroy here since it wasn't added to the head bucket yet.
> */
> - serf_bucket_destroy(client->ostream_tail);
> + serf_pump__complete_setup(&client->pump, NULL);
> /* ### Cleanup! (serf__connection_pre_cleanup) */
> return status;
> }
>
> - /* Share the configuration with all the buckets in the newly created
> output
> - chain (see PLAIN or ENCRYPTED scenario's), including the request buckets
> - created by the application (ostream_tail will handle this for us). */
> - serf_bucket_set_config(client->ostream_head, client->config);
> -
> - /* Share the configuration with the ssl_decrypt and socket buckets. The
> - response buckets wrapping the ssl_decrypt/socket buckets won't get the
> - config automatically because they are upstream. */
> - serf_bucket_set_config(client->stream, client->config);
> -
> - serf_bucket_aggregate_append(client->ostream_head,
> - ostream);
> + serf_pump__complete_setup(&client->pump, ostream);
>
> if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) {
> client->proto_peek_bkt = serf_bucket_aggregate_create(
> @@ -105,7 +64,7 @@ static apr_status_t client_connected(ser
>
> serf_bucket_aggregate_append(
> client->proto_peek_bkt,
> - serf_bucket_barrier_create(client->stream,
> + serf_bucket_barrier_create(client->pump.stream,
> client->allocator));
> }
>
> @@ -140,7 +99,7 @@ static apr_status_t http1_enqueue_repons
> void *enqueue_baton,
> serf_bucket_t *bucket)
> {
> - serf_bucket_aggregate_append(request->incoming->ostream_tail,
> + serf_bucket_aggregate_append(request->incoming-
> >pump.ostream_tail,
> serf__bucket_event_create(bucket,
> request,
> NULL,
> @@ -194,7 +153,7 @@ apr_status_t perform_peek_protocol(serf_
>
> if (!peek_data) {
>
> - status = serf_bucket_peek(client->stream, &data, &len);
> + status = serf_bucket_peek(client->pump.stream, &data, &len);
>
> if (len > h2prefixlen)
> len = h2prefixlen;
> @@ -227,7 +186,7 @@ apr_status_t perform_peek_protocol(serf_
> }
>
> do {
> - status = serf_bucket_read(client->stream,
> + status = serf_bucket_read(client->pump.stream,
> h2prefixlen - peek_data->read,
> &data, &len);
>
> @@ -314,7 +273,7 @@ static apr_status_t read_from_client(ser
> client->proto_peek_bkt = NULL;
> }
> else
> - read_bkt = serf_bucket_barrier_create(client->stream,
> + read_bkt = serf_bucket_barrier_create(client->pump.stream,
> client->allocator);
>
> status = client->req_setup(&rq->req_bkt, read_bkt, rq,
> @@ -355,7 +314,7 @@ static apr_status_t read_from_client(ser
> const char *data;
> apr_size_t len;
>
> - status = serf_bucket_peek(client->stream, &data, &len);
> + status = serf_bucket_peek(client->pump.stream, &data, &len);
> }
> }
> }
> @@ -390,144 +349,10 @@ static apr_status_t read_from_client(ser
> return status;
> }
>
> -static apr_status_t socket_writev(serf_incoming_t *client)
> -{
> - apr_size_t written;
> - apr_status_t status;
> -
> - status = apr_socket_sendv(client->skt, client->vec,
> - client->vec_len, &written);
> - if (status && !APR_STATUS_IS_EAGAIN(status))
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
> - "socket_sendv error %d\n", status);
> -
> - /* did we write everything? */
> - if (written) {
> - apr_size_t len = 0;
> - int i;
> -
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
> - "--- socket_sendv: %d bytes. --\n", written);
> -
> - for (i = 0; i < client->vec_len; i++) {
> - len += client->vec[i].iov_len;
> - if (written < len) {
> - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client-
> >config,
> - "%.*s", client->vec[i].iov_len - (len -
> written),
> - client->vec[i].iov_base);
> - if (i) {
> - memmove(client->vec, &client->vec[i],
> - sizeof(struct iovec) * (client->vec_len - i));
> - client->vec_len -= i;
> - }
> - client->vec[0].iov_base = (char *)client->vec[0].iov_base +
> (client-
> >vec[0].iov_len - (len - written));
> - client->vec[0].iov_len = len - written;
> - break;
> - } else {
> - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client-
> >config,
> - "%.*s",
> - client->vec[i].iov_len,
> client->vec[i].iov_base);
> - }
> - }
> - if (len == written) {
> - client->vec_len = 0;
> - }
> - serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, client-
> >config, "\n");
> -
> - /* Log progress information */
> - serf__context_progress_delta(client->ctx, 0, written);
> - }
> -
> - return status;
> -}
> -
> -static apr_status_t no_more_writes(serf_incoming_t *client)
> -{
> - /* Note that we should hold new requests until we open our new socket.
> */
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
> - "stop writing on client 0x%x\n", client);
> -
> - /* Clear our iovec. */
> - client->vec_len = 0;
> -
> - /* Update the pollset to know we don't want to write on this socket any
> - * more.
> - */
> - serf_io__set_pollset_dirty(&client->io);
> - return APR_SUCCESS;
> -}
> -
> apr_status_t serf__incoming_client_flush(serf_incoming_t *client,
> bool pump)
> {
> - apr_status_t status = APR_SUCCESS;
> - apr_status_t read_status = APR_SUCCESS;
> - serf_bucket_t *ostreamh = client->ostream_head;
> -
> - client->hit_eof = FALSE;
> -
> - while (status == APR_SUCCESS) {
> -
> - /* First try to write out what is already stored in the
> - connection vecs. */
> - while (client->vec_len && !status) {
> - status = socket_writev(client);
> -
> - /* If the write would have blocked, then we're done.
> - * Don't try to write anything else to the socket.
> - */
> - if (APR_STATUS_IS_EPIPE(status)
> - || APR_STATUS_IS_ECONNRESET(status)
> - || APR_STATUS_IS_ECONNABORTED(status))
> - return no_more_writes(client);
> - }
> -
> - if (status || !pump)
> - return status;
> - else if (read_status || client->vec_len || client->hit_eof)
> - return read_status;
> -
> - /* ### optimize at some point by using read_for_sendfile */
> - /* TODO: now that read_iovec will effectively try to return as much
> - data as available, we probably don't want to read ALL_AVAIL, but
> - a lower number, like the size of one or a few TCP packets, the
> - available TCP buffer size ... */
> - client->hit_eof = 0;
> - read_status = serf_bucket_read_iovec(ostreamh,
> - SERF_READ_ALL_AVAIL,
> - IOV_MAX,
> - client->vec,
> - &client->vec_len);
> -
> - if (read_status == SERF_ERROR_WAIT_CONN) {
> - /* The bucket told us that it can't provide more data until
> - more data is read from the socket. This normally happens
> - during a SSL handshake.
> -
> - We should avoid looking for writability for a while so
> - that (hopefully) something will appear in the bucket so
> - we can actually write something. otherwise, we could
> - end up in a CPU spin: socket wants something, but we
> - don't have anything (and keep returning EAGAIN) */
> - client->stop_writing = true;
> - serf_io__set_pollset_dirty(&client->io);
> -
> - read_status = APR_EAGAIN;
> - }
> - else if (APR_STATUS_IS_EAGAIN(read_status)) {
> -
> - /* We read some stuff, but did we read everything ? */
> - if (client->hit_eof)
> - read_status = APR_SUCCESS;
> - }
> - else if (SERF_BUCKET_READ_ERROR(read_status)) {
> -
> - /* Something bad happened. Propagate any errors. */
> - return read_status;
> - }
> - }
> -
> - return status;
> + return serf_pump__write(&client->pump, pump);
> }
>
> static apr_status_t write_to_client(serf_incoming_t *client)
> @@ -561,7 +386,7 @@ void serf_incoming_set_framing_type(
>
> if (client->skt) {
> serf_io__set_pollset_dirty(&client->io);
> - client->stop_writing = 0;
> + client->pump.stop_writing = false;
>
> /* Close down existing protocol */
> if (client->protocol_baton && client->perform_teardown) {
> @@ -745,10 +570,16 @@ apr_status_t serf_incoming_create2(
> ic->closed = closed;
> ic->closed_baton = closed_baton;
>
> - /* A bucket wrapped around our socket (for reading responses). */
> - ic->stream = NULL;
> - ic->ostream_head = NULL;
> - ic->ostream_tail = NULL;
> + /* Store the connection specific info in the configuration store */
> + rv = serf__config_store_get_client_config(ctx, ic, &config, pool);
> + if (rv) {
> + apr_pool_destroy(ic->pool);
> + return rv;
> + }
> + ic->config = config;
> +
> + /* Prepare wrapping the socket with buckets. */
> + serf_pump__init(&ic->pump, &ic->io, ic->skt, config, ic->allocator, ic-
> >pool);
>
> ic->protocol_baton = NULL;
> ic->perform_read = read_from_client;
> @@ -762,14 +593,6 @@ apr_status_t serf_incoming_create2(
> ic->desc.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
> ic->seen_in_pollset = 0;
>
> - /* Store the connection specific info in the configuration store */
> - rv = serf__config_store_get_client_config(ctx, ic, &config, pool);
> - if (rv) {
> - apr_pool_destroy(ic->pool);
> - return rv;
> - }
> - ic->config = config;
> -
> rv = ctx->pollset_add(ctx->pollset_baton,
> &ic->desc, &ic->io);
>
> @@ -928,38 +751,8 @@ apr_status_t serf__incoming_update_polls
> But it also has the nice side effect of removing references
> from the aggregate to requests that are done.
> */
> - if (client->vec_len) {
> - /* We still have vecs in the connection, which lifetime is
> - managed by buckets inside client->ostream_head.
> -
> - Don't touch ostream as that might destroy the vecs */
> -
> - data_waiting = true;
> - }
> - else {
> - serf_bucket_t *ostream;
> -
> - ostream = client->ostream_head;
>
> - if (ostream) {
> - const char *dummy_data;
> - apr_size_t len;
> -
> - status = serf_bucket_peek(ostream, &dummy_data, &len);
> -
> - if (SERF_BUCKET_READ_ERROR(status) || len > 0) {
> - /* DATA or error waiting */
> - data_waiting = TRUE; /* Error waiting */
> - }
> - else if (! status || APR_STATUS_IS_EOF(status)) {
> - data_waiting = FALSE;
> - }
> - else
> - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */
> - }
> - else
> - data_waiting = FALSE;
> - }
> + data_waiting = serf_pump__data_pending(&client->pump);
>
> if (data_waiting) {
> desc.reqevents |= APR_POLLOUT;
>
> Modified: serf/trunk/protocols/fcgi_protocol.c
> URL:
> http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=171
> 5005&r1=1715004&r2=1715005&view=diff
> ==========================================================
> ====================
> --- serf/trunk/protocols/fcgi_protocol.c (original)
> +++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 14:45:58 2015
> @@ -562,8 +562,8 @@ static apr_status_t fcgi_server_read(ser
> serf_fcgi_protocol_t *fcgi = client->protocol_baton;
>
> if (! fcgi->stream) {
> - fcgi->stream = client->stream;
> - fcgi->ostream = client->ostream_tail;
> + fcgi->stream = client->pump.stream;
> + fcgi->ostream = client->pump.ostream_tail;
> }
>
> return fcgi_read(fcgi);
> @@ -574,8 +574,8 @@ static apr_status_t fcgi_server_write(se
> serf_fcgi_protocol_t *fcgi = client->protocol_baton;
>
> if (!fcgi->stream) {
> - fcgi->stream = client->stream;
> - fcgi->ostream = client->ostream_tail;
> + fcgi->stream = client->pump.stream;
> + fcgi->ostream = client->pump.ostream_tail;
> }
>
> return fcgi_write(fcgi);
> @@ -606,8 +606,8 @@ void serf__fcgi_protocol_init_server(ser
> fcgi->pool = protocol_pool;
> fcgi->client = client;
> fcgi->io = &client->io;
> - fcgi->stream = client->stream;
> - fcgi->ostream = client->ostream_tail;
> + fcgi->stream = client->pump.stream;
> + fcgi->ostream = client->pump.ostream_tail;
> fcgi->allocator = client->allocator;
> fcgi->config = client->config;
>
>
> Modified: serf/trunk/protocols/http2_protocol.c
> URL:
> http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1
> 715005&r1=1715004&r2=1715005&view=diff
> ==========================================================
> ====================
> --- serf/trunk/protocols/http2_protocol.c (original)
> +++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 14:45:58 2015
> @@ -331,8 +331,8 @@ void serf__http2_protocol_init_server(se
> h2->pool = protocol_pool;
> h2->client = client;
> h2->io = &client->io;
> - h2->stream = client->stream;
> - h2->ostream = client->ostream_tail;
> + h2->stream = client->pump.stream;
> + h2->ostream = client->pump.ostream_tail;
> h2->allocator = client->allocator;
> h2->config = client->config;
>
> @@ -1728,9 +1728,9 @@ http2_incoming_read(serf_incoming_t *cli
>
> /* If the stop_writing flag was set on the connection, reset it now
> because
> there is some data to read. */
> - if (client->stop_writing)
> + if (client->pump.stop_writing)
> {
> - client->stop_writing = 0;
> + client->pump.stop_writing = false;
> serf_io__set_pollset_dirty(&client->io);
> }
>
> @@ -1740,7 +1740,7 @@ http2_incoming_read(serf_incoming_t *cli
> if (client->proto_peek_bkt)
> stream = client->proto_peek_bkt;
> else
> - stream = client->stream;
> + stream = client->pump.stream;
>
> do {
> const char *data;
> @@ -1766,7 +1766,7 @@ http2_incoming_read(serf_incoming_t *cli
> serf_bucket_destroy(client->proto_peek_bkt);
> client->proto_peek_bkt = NULL;
>
> - h2->stream = client->stream;
> + h2->stream = client->pump.stream;
> }
>
> if (APR_STATUS_IS_EAGAIN(status) || status ==
> SERF_ERROR_WAIT_CONN)
>
> Copied: serf/trunk/pump.c (from r1714992, serf/trunk/outgoing.c)
> URL:
> http://svn.apache.org/viewvc/serf/trunk/pump.c?p2=serf/trunk/pump.c&p
> 1=serf/trunk/outgoing.c&r1=1714992&r2=1715005&rev=1715005&view=diff
> ==========================================================
> ====================
> --- serf/trunk/outgoing.c (original)
> +++ serf/trunk/pump.c Wed Nov 18 14:45:58 2015
> @@ -29,353 +29,111 @@
>
> #include "serf_private.h"
>
> -/* forward definitions */
> -static apr_status_t read_from_connection(serf_connection_t *conn);
> -static apr_status_t write_to_connection(serf_connection_t *conn);
> -static apr_status_t hangup_connection(serf_connection_t *conn);
> -
> -#define REQS_IN_PROGRESS(conn) \
> - ((conn)->completed_requests - (conn)->completed_responses)
> -
> -/* cleanup for sockets */
> -static apr_status_t clean_skt(void *data)
> +apr_status_t pump_cleanup(void *baton)
> {
> - serf_connection_t *conn = data;
> - apr_status_t status = APR_SUCCESS;
> + serf_pump_t *pump = baton;
>
> - if (conn->skt) {
> - status = apr_socket_close(conn->skt);
> - conn->skt = NULL;
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "closed socket, status %d\n", status);
> - serf_config_remove_value(conn->config,
> SERF_CONFIG_CONN_LOCALIP);
> - serf_config_remove_value(conn->config,
> SERF_CONFIG_CONN_REMOTEIP);
> + if (pump->ostream_head != NULL) {
> +#ifdef SERF_DEBUG_BUCKET_USE
> + serf__bucket_drain(conn->ostream_head);
> +#endif
> + serf_bucket_destroy(pump->ostream_head);
> + pump->ostream_head = NULL;
> + pump->ostream_tail = NULL;
> }
>
> - return status;
> + return APR_SUCCESS;
> }
>
> -/* cleanup for conns */
> -static apr_status_t clean_conn(void *data)
> -{
> - serf_connection_t *conn = data;
> -
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "cleaning up connection 0x%x\n", conn);
> - serf_connection_close(conn);
> +void serf_pump__init(serf_pump_t *pump,
> + serf_io_baton_t *io,
> + apr_socket_t *skt,
> + serf_config_t *config,
> + serf_bucket_alloc_t *allocator,
> + apr_pool_t *pool)
> +{
> + memset(pump, 0, sizeof(*pump));
> +
> + pump->io = io;
> + pump->allocator = allocator;
> + pump->config = config;
> + pump->skt = skt;
>
> - return APR_SUCCESS;
> + apr_pool_cleanup_register(pool, pump, pump_cleanup,
> + apr_pool_cleanup_null);
> }
>
> /* Safely check if there is still data pending on the connection, carefull
> to not accidentally make it invalid. */
> -static int
> -data_pending(serf_connection_t *conn)
> +bool serf_pump__data_pending(serf_pump_t *pump)
> {
> - if (conn->vec_len > 0)
> + if (pump->vec_len > 0)
> return TRUE; /* We can't poll right now! */
>
> - if (conn->ostream_head) {
> - const char *dummy;
> + if (pump->ostream_head) {
> + const char *data;
> apr_size_t len;
> apr_status_t status;
>
> - status = serf_bucket_peek(conn->ostream_head, &dummy,
> - &len);
> + status = serf_bucket_peek(pump->ostream_head, &data, &len);
> if (!SERF_BUCKET_READ_ERROR(status)) {
> if (len > 0) {
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn-
> >config,
> + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump-
> >config,
> "Extra data to be written after sending complete "
> "requests.\n");
> - return TRUE;
> + return true;
> }
> }
> else
> - return TRUE; /* Sure, we have data (an error) */
> - }
> -
> - return FALSE;
> -}
> -
> -static int
> -request_pending(serf_request_t **next_req, serf_connection_t *conn)
> -{
> - /* Prepare the next request */
> - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_NONE
> - && (conn->pipelining || (!conn->pipelining &&
> REQS_IN_PROGRESS(conn) == 0)))
> - {
> - /* Skip all requests that have been written completely but we're
> still
> - waiting for a response. */
> - serf_request_t *request = conn->unwritten_reqs;
> -
> - if (next_req)
> - *next_req = request;
> -
> - if (request != NULL) {
> - return TRUE;
> - }
> + return true; /* Sure, we have data (an error) */
> }
> - else if (next_req)
> - *next_req = NULL;
> -
> - return FALSE;
> -}
> -
> -/* Check if there is data waiting to be sent over the socket. This can happen
> - in two situations:
> - - The connection queue has atleast one request with unwritten data.
> - - All requests are written and the ssl layer wrote some data while reading
> - the response. This can happen when the server triggers a renegotiation,
> - e.g. after the first and only request on that connection was received.
> - Returns 1 if data is pending on CONN, NULL if not.
> - If NEXT_REQ is not NULL, it will be filled in with the next available
> request
> - with unwritten data. */
> -static int
> -request_or_data_pending(serf_request_t **next_req, serf_connection_t
> *conn)
> -{
> - if (request_pending(next_req, conn))
> - return TRUE;
> -
> - return data_pending(conn);
> -}
> -
> -/* Update the pollset for this connection. We tweak the pollset based on
> - * whether we want to read and/or write, given conditions within the
> - * connection. If the connection is not (yet) in the pollset, then it
> - * will be added.
> - */
> -apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
> -{
> - serf_context_t *ctx = conn->ctx;
> - apr_status_t status;
> - apr_pollfd_t desc = { 0 };
> - int data_waiting;
>
> - if (!conn->skt) {
> - return APR_SUCCESS;
> - }
> -
> - /* Remove the socket from the poll set. */
> - desc.desc_type = APR_POLL_SOCKET;
> - desc.desc.s = conn->skt;
> - desc.reqevents = conn->io.reqevents;
> -
> - status = ctx->pollset_rm(ctx->pollset_baton,
> - &desc, &conn->io);
> - if (status && !APR_STATUS_IS_NOTFOUND(status))
> - return status;
> -
> - /* Now put it back in with the correct read/write values. */
> - desc.reqevents = APR_POLLHUP | APR_POLLERR;
> -
> - /* If we are not connected yet, we just want to know when we are */
> - if (conn->wait_for_connect) {
> - data_waiting = TRUE;
> - desc.reqevents |= APR_POLLOUT;
> - }
> - else {
> - /* Directly look at the connection data. While this may look
> - more expensive than the cheap checks later this peek is
> - just checking a bit of ram.
> -
> - But it also has the nice side effect of removing references
> - from the aggregate to requests that are done.
> - */
> - if (conn->vec_len) {
> - /* We still have vecs in the connection, which lifetime is
> - managed by buckets inside conn->ostream_head.
> -
> - Don't touch ostream as that might destroy the vecs */
> -
> - data_waiting = (conn->state != SERF_CONN_CLOSING);
> - }
> - else {
> - serf_bucket_t *ostream;
> -
> - ostream = conn->ostream_head;
> -
> - if (!ostream)
> - ostream = conn->ssltunnel_ostream;
> -
> - if (ostream) {
> - const char *dummy_data;
> - apr_size_t len;
> -
> - status = serf_bucket_peek(ostream, &dummy_data, &len);
> -
> - if (SERF_BUCKET_READ_ERROR(status) || len > 0) {
> - /* DATA or error waiting */
> - data_waiting = TRUE; /* Error waiting */
> - }
> - else if (! status || APR_STATUS_IS_EOF(status)) {
> - data_waiting = FALSE;
> - }
> - else
> - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */
> - }
> - else
> - data_waiting = FALSE;
> - }
> -
> - if (data_waiting) {
> - desc.reqevents |= APR_POLLOUT;
> - }
> - }
> -
> - if ((conn->written_reqs || conn->unwritten_reqs) &&
> - conn->state != SERF_CONN_INIT) {
> - /* If there are any outstanding events, then we want to read. */
> - /* ### not true. we only want to read IF we have sent some data */
> - desc.reqevents |= APR_POLLIN;
> -
> - /* Don't write if OpenSSL told us that it needs to read data first.
> */
> - if (! conn->stop_writing && !data_waiting) {
> -
> - /* This check is duplicated in write_to_connection() */
> - if ((conn->probable_keepalive_limit &&
> - conn->completed_requests > conn->probable_keepalive_limit)
> ||
> - (conn->max_outstanding_requests &&
> - REQS_IN_PROGRESS(conn) >= conn->max_outstanding_requests))
> {
> -
> - /* we wouldn't try to write any way right now. */
> - }
> - else if (request_pending(NULL, conn)) {
> - desc.reqevents |= APR_POLLOUT;
> - }
> - }
> - }
> -
> - /* If we can have async responses, always look for something to read. */
> - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_HTTP1
> - || conn->async_responses)
> - {
> - desc.reqevents |= APR_POLLIN;
> - }
> -
> - /* save our reqevents, so we can pass it in to remove later. */
> - conn->io.reqevents = desc.reqevents;
> -
> - /* Note: even if we don't want to read/write this socket, we still
> - * want to poll it for hangups and errors.
> - */
> - return ctx->pollset_add(ctx->pollset_baton,
> - &desc, &conn->io);
> + return false;
> }
>
> -#ifdef SERF_DEBUG_BUCKET_USE
> -
> -/* Make sure all response buckets were drained. */
> -static void check_buckets_drained(serf_connection_t *conn)
> +static apr_status_t detect_eof(void *baton, serf_bucket_t
> *aggregate_bucket)
> {
> - serf_request_t *request = conn->written_reqs;
> + serf_pump_t *pump = baton;
> + pump->hit_eof = true;
>
> - for ( ; request ; request = request->next ) {
> - if (request->resp_bkt != NULL) {
> - /* ### crap. can't do this. this allocator may have un-drained
> - * ### REQUEST buckets.
> - */
> - /* serf_debug__entered_loop(request->resp_bkt->allocator); */
> - /* ### for now, pretend we closed the conn (resets the tracking)
> */
> - serf_debug__closed_conn(request->resp_bkt->allocator);
> - }
> + if (pump->done_writing) {
> + pump->ostream_tail = NULL;
> + return APR_EOF;
> }
> + else
> + return APR_EAGAIN;
> }
>
> -#endif
> -
> -/* Destroys all outstanding write information, to allow cleanup of subpools
> - that may still have data in these buckets to continue */
> -void serf__connection_pre_cleanup(serf_connection_t *conn)
> +void serf_pump__prepare_setup(serf_pump_t *pump)
> {
> - serf_request_t *rq;
> - conn->vec_len = 0;
> -
> - if (conn->ostream_head != NULL) {
> -#ifdef SERF_DEBUG_BUCKET_USE
> - serf__bucket_drain(conn->ostream_head);
> -#endif
> - serf_bucket_destroy(conn->ostream_head);
> - conn->ostream_head = NULL;
> - conn->ostream_tail = NULL;
> - }
> - if (conn->ssltunnel_ostream != NULL) {
> - serf_bucket_destroy(conn->ssltunnel_ostream);
> - conn->ssltunnel_ostream = NULL;
> + if (pump->ostream_head == NULL) {
> + pump->ostream_head = serf_bucket_aggregate_create(pump-
> >allocator);
> }
>
> - /* Tell all written request that they are free to destroy themselves */
> - rq = conn->written_reqs;
> - while (rq != NULL) {
> - if (rq->writing == SERF_WRITING_STARTED
> - || rq->writing == SERF_WRITING_DONE) {
> + if (pump->ostream_tail == NULL) {
> + pump->ostream_tail = serf_bucket_aggregate_create(pump-
> >allocator);
>
> - rq->writing = SERF_WRITING_FINISHED;
> - }
> - rq = rq->next;
> + serf_bucket_aggregate_hold_open(pump->ostream_tail, detect_eof,
> pump);
> }
> -
> - /* Destroy the requests that were queued up to destroy later */
> - while ((rq = conn->done_reqs)) {
> - conn->done_reqs = rq->next;
> -
> - rq->writing = SERF_WRITING_FINISHED;
> - serf__destroy_request(rq);
> - }
> - conn->done_reqs = conn->done_reqs_tail = NULL;
> }
>
> -static apr_status_t detect_eof(void *baton, serf_bucket_t
> *aggregate_bucket)
> +void serf_pump__complete_setup(serf_pump_t *pump,
> + serf_bucket_t *ostream)
> {
> - serf_connection_t *conn = baton;
> - conn->hit_eof = 1;
> - return APR_EAGAIN;
> -}
> -
> -static apr_status_t do_conn_setup(serf_connection_t *conn)
> -{
> - apr_status_t status;
> - serf_bucket_t *ostream;
> -
> - /* ### dunno what the hell this is about. this latency stuff got
> - ### added, and who knows whether it should stay... */
> - conn->latency = apr_time_now() - conn->connect_time;
> -
> - if (conn->ostream_head == NULL) {
> - conn->ostream_head = serf_bucket_aggregate_create(conn-
> >allocator);
> - }
> -
> - if (conn->ostream_tail == NULL) {
> - conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator);
> -
> - serf_bucket_aggregate_hold_open(conn->ostream_tail, detect_eof,
> conn);
> - }
> -
> - ostream = conn->ostream_tail;
> -
> - status = (*conn->setup)(conn->skt,
> - &conn->stream,
> - &ostream,
> - conn->setup_baton,
> - conn->pool);
> - if (status) {
> - /* extra destroy here since it wasn't added to the head bucket yet.
> */
> - serf_bucket_destroy(conn->ostream_tail);
> - serf__connection_pre_cleanup(conn);
> - return status;
> - }
> + if (ostream)
> + serf_bucket_aggregate_append(pump->ostream_head, ostream);
> + else
> + serf_bucket_aggregate_append(pump->ostream_head, pump-
> >ostream_tail);
>
> /* Share the configuration with all the buckets in the newly created
> output
> chain (see PLAIN or ENCRYPTED scenario's), including the request buckets
> created by the application (ostream_tail will handle this for us). */
> - serf_bucket_set_config(conn->ostream_head, conn->config);
> + serf_bucket_set_config(pump->ostream_head, pump->config);
>
> /* Share the configuration with the ssl_decrypt and socket buckets. The
> response buckets wrapping the ssl_decrypt/socket buckets won't get the
> config automatically because they are upstream. */
> - serf_bucket_set_config(conn->stream, conn->config);
> -
> - serf_bucket_aggregate_append(conn->ostream_head,
> - ostream);
> + serf_bucket_set_config(pump->stream, pump->config);
>
> /* We typically have one of two scenarios, based on whether the
> application decided to encrypt this connection:
> @@ -394,381 +152,53 @@ static apr_status_t do_conn_setup(serf_c
>
> where STREAM is an internal variant of AGGREGATE.
> */
> -
> - return status;
> }
>
> -/* Set up the input and output stream buckets.
> - When a tunnel over an http proxy is needed, create a socket bucket and
> - empty aggregate bucket for sending and receiving unencrypted requests
> - over the socket.
> -
> - After the tunnel is there, or no tunnel was needed, ask the application
> - to create the input and output buckets, which should take care of the
> - [en/de]cryption.
> - */
> -
> -static apr_status_t prepare_conn_streams(serf_connection_t *conn,
> - serf_bucket_t **ostreamt,
> - serf_bucket_t **ostreamh)
> +void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump)
> {
> - apr_status_t status;
> + apr_sockaddr_t *sa;
>
> - /* Do we need a SSL tunnel first? */
> - if (conn->state == SERF_CONN_CONNECTED) {
> - /* If the connection does not have an associated bucket, then
> - * call the setup callback to get one.
> - */
> - if (conn->stream == NULL) {
> - status = do_conn_setup(conn);
> - if (status) {
> - return status;
> - }
> - }
> - *ostreamt = conn->ostream_tail;
> - *ostreamh = conn->ostream_head;
> - } else if (conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
> -
> - /* SSL tunnel needed and not set up yet, get a direct unencrypted
> - stream for this socket */
> - if (conn->stream == NULL) {
> - conn->stream = serf_context_bucket_socket_create(conn->ctx,
> - conn->skt,
> -
> conn->allocator);
> - }
> -
> - /* Don't create the ostream bucket chain including the ssl_encrypt
> - bucket yet. This ensure the CONNECT request is sent unencrypted
> - to the proxy. */
> - *ostreamt = *ostreamh = conn->ssltunnel_ostream;
> - } else {
> - /* SERF_CONN_CLOSING or SERF_CONN_INIT */
> -
> - *ostreamt = conn->ostream_tail;
> - *ostreamh = conn->ostream_head;
> - }
> -
> - return APR_SUCCESS;
> -}
> -
> -static void store_ipaddresses_in_config(serf_config_t *config,
> - apr_socket_t *skt)
> -{
> - apr_sockaddr_t *sa;
> -
> - if (apr_socket_addr_get(&sa, APR_LOCAL, skt) == APR_SUCCESS) {
> + if (apr_socket_addr_get(&sa, APR_LOCAL, pump->skt) == APR_SUCCESS)
> {
> char buf[48];
> if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
> - serf_config_set_stringf(config, SERF_CONFIG_CONN_LOCALIP,
> + serf_config_set_stringf(pump->config,
> SERF_CONFIG_CONN_LOCALIP,
> "%s:%d", buf, sa->port);
> }
> - if (apr_socket_addr_get(&sa, APR_REMOTE, skt) == APR_SUCCESS) {
> + if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) ==
> APR_SUCCESS) {
> char buf[48];
> if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
> - serf_config_set_stringf(config, SERF_CONFIG_CONN_REMOTEIP,
> + serf_config_set_stringf(pump->config,
> SERF_CONFIG_CONN_REMOTEIP,
> "%s:%d", buf, sa->port);
> }
> }
>
> -static apr_status_t connect_connection(serf_connection_t *conn)
> -{
> - serf_context_t *ctx = conn->ctx;
> - apr_status_t status;
> -
> - store_ipaddresses_in_config(conn->config, conn->skt);
> -
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "socket for conn 0x%x connected\n", conn);
> -
> - /* If the authentication was already started on another connection,
> - prepare this connection (it might be possible to skip some
> - part of the handshaking). */
> - if (ctx->proxy_address) {
> - status = serf__auth_setup_connection(PROXY, conn);
> - if (status) {
> - return status;
> - }
> - }
> -
> - status = serf__auth_setup_connection(HOST, conn);
> - if (status)
> - return status;
> -
> - /* Does this connection require a SSL tunnel over the proxy? */
> - if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") ==
> 0)
> - serf__ssltunnel_connect(conn);
> - else {
> - conn->state = SERF_CONN_CONNECTED;
> - status = do_conn_setup(conn);
> - }
> -
> - return APR_SUCCESS;
> -}
> -
> -/* Create and connect sockets for any connections which don't have them
> - * yet. This is the core of our lazy-connect behavior.
> - */
> -apr_status_t serf__open_connections(serf_context_t *ctx)
> -{
> - int i;
> -
> - for (i = ctx->conns->nelts; i--; ) {
> - serf_connection_t *conn = GET_CONN(ctx, i);
> - apr_status_t status;
> - apr_socket_t *skt;
> -
> - conn->seen_in_pollset = 0;
> -
> - if (conn->skt != NULL) {
> -#ifdef SERF_DEBUG_BUCKET_USE
> - check_buckets_drained(conn);
> -#endif
> - continue;
> - }
> -
> - /* Delay opening until we have something to deliver! */
> - if (conn->unwritten_reqs == NULL) {
> - continue;
> - }
> -
> - apr_pool_clear(conn->skt_pool);
> - status = apr_socket_create(&skt, conn->address->family,
> - SOCK_STREAM,
> -#if APR_MAJOR_VERSION > 0
> - APR_PROTO_TCP,
> -#endif
> - conn->skt_pool);
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "created socket for conn 0x%x, status %d\n", conn, status);
> - if (status != APR_SUCCESS)
> - return status;
> -
> - apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt,
> - apr_pool_cleanup_null);
> -
> - /* Set the socket to be non-blocking */
> - if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
> - return status;
> -
> - /* Disable Nagle's algorithm */
> - if ((status = apr_socket_opt_set(skt,
> - APR_TCP_NODELAY, 1)) != APR_SUCCESS)
> - return status;
> -
> - /* Configured. Store it into the connection now. */
> - conn->skt = skt;
> -
> - /* Remember time when we started connecting to server to calculate
> - network latency. */
> - conn->connect_time = apr_time_now();
> -
> - /* Now that the socket is set up, let's connect it. This should
> - * return immediately.
> - */
> - status = apr_socket_connect(skt, conn->address);
> - if (status != APR_SUCCESS) {
> - if (!APR_STATUS_IS_EINPROGRESS(status))
> - return status;
> -
> - /* Keep track of when we really connect */
> - conn->wait_for_connect = TRUE;
> - }
> -
> - status = serf_config_set_string(conn->config,
> - SERF_CONFIG_CONN_PIPELINING,
> - (conn->max_outstanding_requests != 1 &&
> - conn->pipelining == 1) ? "Y" : "N");
> - if (status)
> - return status;
> -
> - /* Flag our pollset as dirty now that we have a new socket. */
> - serf_io__set_pollset_dirty(&conn->io);
> -
> - if (! conn->wait_for_connect) {
> - status = connect_connection(conn);
> -
> - if (status)
> - return status;
> - }
> - }
> -
> - return APR_SUCCESS;
> -}
> -
> -static apr_status_t no_more_writes(serf_connection_t *conn)
> +static apr_status_t no_more_writes(serf_pump_t *pump)
> {
> /* Note that we should hold new requests until we open our new socket.
> */
> - conn->state = SERF_CONN_CLOSING;
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "stop writing on conn 0x%x\n", conn);
> + pump->done_writing = true;
> + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
> + "stop writing on 0x%x\n", pump->io->u.conn);
>
> /* Clear our iovec. */
> - conn->vec_len = 0;
> + pump->vec_len = 0;
>
> /* Update the pollset to know we don't want to write on this socket any
> * more.
> */
> - serf_io__set_pollset_dirty(&conn->io);
> - return APR_SUCCESS;
> -}
> -
> -/* Read the 'Connection' header from the response. Return
> SERF_ERROR_CLOSING if
> - * the header contains value 'close' indicating the server is closing the
> - * connection right after this response.
> - * Otherwise returns APR_SUCCESS.
> - */
> -static apr_status_t is_conn_closing(serf_bucket_t *response)
> -{
> - serf_bucket_t *hdrs;
> - const char *val;
> -
> - hdrs = serf_bucket_response_get_headers(response);
> - val = serf_bucket_headers_get(hdrs, "Connection");
> - if (val && strcasecmp("close", val) == 0)
> - {
> - return SERF_ERROR_CLOSING;
> - }
> -
> - return APR_SUCCESS;
> -}
> -
> -static apr_status_t remove_connection(serf_context_t *ctx,
> - serf_connection_t *conn)
> -{
> - apr_pollfd_t desc = { 0 };
> -
> - desc.desc_type = APR_POLL_SOCKET;
> - desc.desc.s = conn->skt;
> - desc.reqevents = conn->io.reqevents;
> -
> - return ctx->pollset_rm(ctx->pollset_baton,
> - &desc, &conn->io);
> -}
> -
> -/* A socket was closed, inform the application. */
> -static void handle_conn_closed(serf_connection_t *conn, apr_status_t
> status)
> -{
> - (*conn->closed)(conn, conn->closed_baton, status,
> - conn->pool);
> -}
> -
> -static apr_status_t reset_connection(serf_connection_t *conn,
> - int requeue_requests)
> -{
> - serf_context_t *ctx = conn->ctx;
> - apr_status_t status;
> - serf_request_t *old_reqs;
> -
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "reset connection 0x%x\n", conn);
> -
> - conn->probable_keepalive_limit = conn->completed_responses;
> - conn->completed_requests = 0;
> - conn->completed_responses = 0;
> -
> - /* Clear the unwritten_reqs queue, so the application can requeue
> cancelled
> - requests on it for the new socket. */
> - old_reqs = conn->unwritten_reqs;
> - conn->unwritten_reqs = NULL;
> - conn->unwritten_reqs_tail = NULL;
> -
> - serf__connection_pre_cleanup(conn);
> -
> - /* First, cancel all written requests for which we haven't received a
> - response yet. Inform the application that the request is cancelled,
> - so it can requeue them if needed. */
> - while (conn->written_reqs) {
> - serf__cancel_request(conn->written_reqs, &conn->written_reqs,
> - requeue_requests);
> - }
> - conn->written_reqs_tail = NULL;
> -
> - /* Handle all outstanding unwritten requests.
> - TODO: what about a partially written request? */
> - while (old_reqs) {
> - /* If we haven't started to write the connection, bring it over
> - * unchanged to our new socket.
> - * Do not copy a CONNECT request to the new connection, the ssl
> tunnel
> - * setup code will create a new CONNECT request already.
> - */
> - if (requeue_requests && (old_reqs->writing == SERF_WRITING_NONE)
> &&
> - !old_reqs->ssltunnel) {
> -
> - serf_request_t *req = old_reqs;
> - old_reqs = old_reqs->next;
> - req->next = NULL;
> - serf__link_requests(&conn->unwritten_reqs,
> - &conn->unwritten_reqs_tail,
> - req);
> - }
> - else {
> - /* We don't want to requeue the request or this request was
> partially
> - written. Inform the application that the request is
> cancelled. */
> - serf__cancel_request(old_reqs, &old_reqs, requeue_requests);
> - }
> - }
> -
> - /* Requests queue has been prepared for a new socket, close the old
> one. */
> - if (conn->skt != NULL) {
> - remove_connection(ctx, conn);
> - status = clean_skt(conn);
> - if (conn->closed != NULL) {
> - handle_conn_closed(conn, status);
> - }
> - }
> -
> - if (conn->stream != NULL) {
> - serf_bucket_destroy(conn->stream);
> - conn->stream = NULL;
> - }
> -
> - /* Don't try to resume any writes */
> - conn->vec_len = 0;
> -
> - serf_io__set_pollset_dirty(&conn->io);
> - conn->state = SERF_CONN_INIT;
> -
> - conn->hit_eof = 0;
> - conn->connect_time = 0;
> - conn->latency = -1;
> - conn->stop_writing = 0;
> - conn->write_now = 0;
> - /* conn->pipelining */
> -
> - conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1;
> -
> - if (conn->protocol_baton) {
> - conn->perform_teardown(conn);
> - conn->protocol_baton = NULL;
> - }
> -
> - conn->perform_read = read_from_connection;
> - conn->perform_write = write_to_connection;
> - conn->perform_hangup = hangup_connection;
> - conn->perform_teardown = NULL;
> -
> - conn->status = APR_SUCCESS;
> -
> - /* Let our context know that we've 'reset' the socket already. */
> - conn->seen_in_pollset |= APR_POLLHUP;
> -
> - /* Recalculate the current list length */
> - conn->nr_of_written_reqs = 0;
> - conn->nr_of_unwritten_reqs = serf__req_list_length(conn-
> >unwritten_reqs);
> -
> - /* Found the connection. Closed it. All done. */
> + serf_io__set_pollset_dirty(pump->io);
> return APR_SUCCESS;
> }
>
> -static apr_status_t socket_writev(serf_connection_t *conn)
> +static apr_status_t socket_writev(serf_pump_t *pump)
> {
> apr_size_t written;
> apr_status_t status;
> + serf_pump_t *conn = pump;
>
> - status = apr_socket_sendv(conn->skt, conn->vec,
> - conn->vec_len, &written);
> + status = apr_socket_sendv(pump->skt, pump->vec,
> + pump->vec_len, &written);
> if (status && !APR_STATUS_IS_EAGAIN(status))
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> + serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump-
> >config,
> "socket_sendv error %d\n", status);
>
> /* did we write everything? */
> @@ -805,18 +235,18 @@ static apr_status_t socket_writev(serf_c
> serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
> "\n");
>
> /* Log progress information */
> - serf__context_progress_delta(conn->ctx, 0, written);
> + serf__context_progress_delta(conn->io->ctx, 0, written);
> }
>
> return status;
> }
>
> -apr_status_t serf__connection_flush(serf_connection_t *conn,
> - int pump)
> +apr_status_t serf_pump__write(serf_pump_t *pump,
> + bool fetch_new)
> {
> apr_status_t status = APR_SUCCESS;
> apr_status_t read_status = APR_SUCCESS;
> - serf_bucket_t *ostreamh = NULL;
> + serf_pump_t *const conn = pump;
>
> conn->hit_eof = FALSE;
>
> @@ -841,23 +271,13 @@ apr_status_t serf__connection_flush(serf
> else if (read_status || conn->vec_len || conn->hit_eof)
> return read_status;
>
> - /* Ok, with the vecs written, we can now refill the per connection
> - output vecs */
> - if (!ostreamh) {
> - serf_bucket_t *ostreamt;
> -
> - status = prepare_conn_streams(conn, &ostreamt, &ostreamh);
> - if (status)
> - return status;
> - }
> -
> /* ### optimize at some point by using read_for_sendfile */
> /* TODO: now that read_iovec will effectively try to return as much
> data as available, we probably don't want to read ALL_AVAIL, but
> a lower number, like the size of one or a few TCP packets, the
> available TCP buffer size ... */
> conn->hit_eof = 0;
> - read_status = serf_bucket_read_iovec(ostreamh,
> + read_status = serf_bucket_read_iovec(pump->ostream_head,
> SERF_READ_ALL_AVAIL,
> IOV_MAX,
> conn->vec,
> @@ -873,8 +293,8 @@ apr_status_t serf__connection_flush(serf
> we can actually write something. otherwise, we could
> end up in a CPU spin: socket wants something, but we
> don't have anything (and keep returning EAGAIN) */
> - conn->stop_writing = 1;
> - serf_io__set_pollset_dirty(&conn->io);
> + conn->stop_writing = true;
> + serf_io__set_pollset_dirty(conn->io);
>
> read_status = APR_EAGAIN;
> }
> @@ -894,936 +314,3 @@ apr_status_t serf__connection_flush(serf
> return status;
> }
>
> -/* Implements serf_bucket_event_callback_t and is called (potentially
> - more than once) after the request buckets are completely read.
> -
> - At this time we know the request is written, but we can't destroy
> - the buckets yet as they might still be referenced by the connection
> - vecs. */
> -static apr_status_t request_writing_done(void *baton,
> - apr_uint64_t bytes_read)
> -{
> - serf_request_t *request = baton;
> -
> - if (request->writing == SERF_WRITING_STARTED) {
> - request->writing = SERF_WRITING_DONE;
> -
> - /* TODO: Handle request done */
> - }
> - return APR_EOF; /* Done with the event bucket */
> -}
> -
> -
> -/* Implements serf_bucket_event_callback_t and is called after the
> - request buckets are no longer needed. More precisely the outgoing
> - buckets are already destroyed. */
> -static apr_status_t request_writing_finished(void *baton,
> - apr_uint64_t bytes_read)
> -{
> - serf_request_t *request = baton;
> - serf_connection_t *conn = request->conn;
> -
> - request->req_bkt = NULL; /* Bucket is destroyed by now */
> -
> - if (request->writing == SERF_WRITING_DONE) {
> - request->writing = SERF_WRITING_FINISHED;
> -
> - /* Move the request to the written queue */
> - serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail,
> - request);
> - conn->nr_of_written_reqs++;
> - conn->unwritten_reqs = conn->unwritten_reqs->next;
> - conn->nr_of_unwritten_reqs--;
> - request->next = NULL;
> -
> - /* If our connection has async responses enabled, we're not
> - * going to get a reply back, so kill the request.
> - */
> - if (conn->async_responses) {
> - conn->unwritten_reqs = request->next;
> - conn->nr_of_unwritten_reqs--;
> - serf__destroy_request(request);
> - }
> -
> - conn->completed_requests++;
> - }
> - /* Destroy (all) requests that are now safe to destroy,
> - Typically non or just the finished one */
> - {
> - serf_request_t *last = NULL;
> - serf_request_t **rq = &conn->done_reqs;
> - while (*rq) {
> - request = *rq;
> - if ((*rq)->writing == SERF_WRITING_FINISHED) {
> - request = *rq;
> - *rq = request->next;
> - serf__destroy_request(request);
> - }
> - else {
> - last = *rq;
> - rq = &last->next;
> - }
> - }
> -
> - conn->done_reqs_tail = last;
> - }
> -
> - return APR_EOF; /* Done with event bucket. Status is ignored */
> -}
> -
> -/* write data out to the connection */
> -static apr_status_t write_to_connection(serf_connection_t *conn)
> -{
> - /* Keep reading and sending until we run out of stuff to read, or
> - * writing would block.
> - */
> - while (1) {
> - serf_request_t *request;
> - apr_status_t status;
> - apr_status_t read_status;
> - serf_bucket_t *ostreamt;
> - serf_bucket_t *ostreamh;
> -
> - /* If we have unwritten data in iovecs, then write what we can
> - directly. */
> - status = serf__connection_flush(conn, FALSE);
> - if (APR_STATUS_IS_EAGAIN(status))
> - return APR_SUCCESS;
> - else if (status)
> - return status;
> -
> - /* If we're setting up an ssl tunnel, we can't send real requests
> - as yet, as they need to be encrypted and our encrypt buckets
> - aren't created yet as we still need to read the unencrypted
> - response of the CONNECT request. */
> - if (conn->state == SERF_CONN_SETUP_SSLTUNNEL
> - && REQS_IN_PROGRESS(conn) > 0)
> - {
> - /* But flush out SSL data when necessary! */
> - status = serf__connection_flush(conn, TRUE);
> - if (APR_STATUS_IS_EAGAIN(status))
> - return APR_SUCCESS;
> -
> - return status;
> - }
> -
> - /* We try to limit the number of in-flight requests so that we
> - don't have to repeat too many if the connection drops.
> -
> - This check matches that in serf__conn_update_pollset()
> - */
> - if ((conn->probable_keepalive_limit &&
> - conn->completed_requests > conn->probable_keepalive_limit) ||
> - (conn->max_outstanding_requests &&
> - REQS_IN_PROGRESS(conn) >= conn->max_outstanding_requests)) {
> -
> - serf_io__set_pollset_dirty(&conn->io);
> -
> - /* backoff for now. */
> - return APR_SUCCESS;
> - }
> -
> - /* We may need to move forward to a request which has something
> - * to write.
> - */
> - if (!request_or_data_pending(&request, conn)) {
> - /* No more requests (with data) are registered with the
> - * connection, and no data is pending on the outgoing stream.
> - * Let's update the pollset so that we don't try to write to this
> - * socket again.
> - */
> - serf_io__set_pollset_dirty(&conn->io);
> - return APR_SUCCESS;
> - }
> -
> - status = prepare_conn_streams(conn, &ostreamt, &ostreamh);
> - if (status) {
> - return status;
> - }
> -
> - if (request && request->writing == SERF_WRITING_NONE) {
> - serf_bucket_t *event_bucket;
> -
> - if (request->req_bkt == NULL) {
> - read_status = serf__setup_request(request);
> - if (read_status) {
> - /* Something bad happened. Propagate any errors. */
> - return read_status;
> - }
> - }
> -
> - request->writing = SERF_WRITING_STARTED;
> -
> - /* And now add an event bucket to keep track of when the request
> - has been completely written */
> - event_bucket = serf__bucket_event_create(request->req_bkt,
> - request,
> - NULL,
> - request_writing_done,
> -
> request_writing_finished,
> - conn->allocator);
> - serf_bucket_aggregate_append(ostreamt, event_bucket);
> - }
> -
> - /* If we got some data, then deliver it. */
> - /* ### what to do if we got no data?? is that a problem? */
> - status = serf__connection_flush(conn, TRUE);
> - if (APR_STATUS_IS_EAGAIN(status))
> - return APR_SUCCESS;
> - else if (status)
> - return status;
> -
> - }
> - /* NOTREACHED */
> -}
> -
> -
> -
> -/* An async response message was received from the server. */
> -static apr_status_t handle_async_response(serf_connection_t *conn,
> - apr_pool_t *pool)
> -{
> - apr_status_t status;
> -
> - if (conn->current_async_response == NULL) {
> - conn->current_async_response =
> - (*conn->async_acceptor)(NULL, conn->stream,
> - conn->async_acceptor_baton, pool);
> - }
> -
> - status = (*conn->async_handler)(NULL, conn->current_async_response,
> - conn->async_handler_baton, pool);
> -
> - if (APR_STATUS_IS_EOF(status)) {
> - serf_bucket_destroy(conn->current_async_response);
> - conn->current_async_response = NULL;
> - status = APR_SUCCESS;
> - }
> -
> - return status;
> -}
> -
> -/* read data from the connection */
> -static apr_status_t read_from_connection(serf_connection_t *conn)
> -{
> - apr_status_t status;
> - apr_pool_t *tmppool;
> - apr_status_t close_connection = APR_SUCCESS;
> -
> - /* Whatever is coming in on the socket corresponds to the first request
> - * on our chain.
> - */
> - serf_request_t *request = conn->written_reqs;
> - if (!request) {
> - /* Request wasn't completely written yet! */
> - request = conn->unwritten_reqs;
> - }
> -
> - /* If the stop_writing flag was set on the connection, reset it now
> because
> - there is some data to read. */
> - if (conn->stop_writing) {
> - conn->stop_writing = 0;
> - serf_io__set_pollset_dirty(&conn->io);
> - }
> -
> - /* assert: request != NULL */
> -
> - if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
> - return status;
> -
> - /* Invoke response handlers until we have no more work. */
> - while (1) {
> - serf_bucket_t *dummy1, *dummy2;
> -
> - apr_pool_clear(tmppool);
> -
> - /* Only interested in the input stream here. */
> - status = prepare_conn_streams(conn, &dummy1, &dummy2);
> - if (status) {
> - goto error;
> - }
> -
> - /* We have a different codepath when we can have async responses. */
> - if (conn->async_responses) {
> - /* TODO What about socket errors? */
> - status = handle_async_response(conn, tmppool);
> - if (APR_STATUS_IS_EAGAIN(status)) {
> - status = APR_SUCCESS;
> - goto error;
> - }
> - if (status) {
> - goto error;
> - }
> - continue;
> - }
> -
> - /* We are reading a response for a request we haven't
> - * written yet!
> - *
> - * This shouldn't normally happen EXCEPT:
> - *
> - * 1) when the other end has closed the socket and we're
> - * pending an EOF return.
> - * 2) Doing the initial SSL handshake - we'll get EAGAIN
> - * as the SSL buckets will hide the handshake from us
> - * but not return any data.
> - * 3) When the server sends us an SSL alert.
> - *
> - * In these cases, we should not receive any actual user data.
> - *
> - * 4) When the server sends a error response, like 408 Request
> timeout.
> - * This response should be passed to the application.
> - *
> - * If we see an EOF (due to either an expired timeout or the server
> - * sending the SSL 'close notify' shutdown alert), we'll reset the
> - * connection and open a new one.
> - */
> - if (request->req_bkt || request->writing == SERF_WRITING_NONE) {
> - const char *data;
> - apr_size_t len;
> -
> - status = serf_bucket_peek(conn->stream, &data, &len);
> -
> - if (APR_STATUS_IS_EOF(status)) {
> - reset_connection(conn, 1);
> - status = APR_SUCCESS;
> - goto error;
> - }
> - else if (APR_STATUS_IS_EAGAIN(status) && !len) {
> - status = APR_SUCCESS;
> - goto error;
> - } else if (status && !APR_STATUS_IS_EAGAIN(status)) {
> - /* Read error */
> - goto error;
> - }
> -
> - /* Unexpected response from the server */
> - if (conn->write_now) {
> - conn->write_now = 0;
> - status = conn->perform_write(conn);
> -
> - if (!SERF_BUCKET_READ_ERROR(status))
> - status = APR_SUCCESS;
> - }
> - }
> -
> - if (conn->framing_type != SERF_CONNECTION_FRAMING_TYPE_HTTP1)
> - break;
> -
> - /* If the request doesn't have a response bucket, then call the
> - * acceptor to get one created.
> - */
> - if (request->resp_bkt == NULL) {
> - if (! request->acceptor) {
> - /* Request wasn't even setup.
> - Server replying before it received anything? */
> - return SERF_ERROR_BAD_HTTP_RESPONSE;
> - }
> -
> - request->resp_bkt = (*request->acceptor)(request, conn->stream,
> - request->acceptor_baton,
> - tmppool);
> - apr_pool_clear(tmppool);
> -
> - /* Share the configuration with the response bucket(s) */
> - serf_bucket_set_config(request->resp_bkt, conn->config);
> - }
> -
> - status = serf__handle_response(request, tmppool);
> -
> - /* If we received APR_SUCCESS, run this loop again. */
> - if (!status) {
> - continue;
> - }
> -
> - /* If our response handler says it can't do anything more, we now
> - * treat that as a success.
> - */
> - if (APR_STATUS_IS_EAGAIN(status)) {
> - /* It is possible that while reading the response, the ssl layer
> - has prepared some data to send. If this was the last request,
> - serf will not check for socket writability, so force this
> here.
> - */
> - if (request_or_data_pending(&request, conn) && !request) {
> - serf_io__set_pollset_dirty(&conn->io);
> - }
> - status = APR_SUCCESS;
> - goto error;
> - }
> -
> - close_connection = is_conn_closing(request->resp_bkt);
> -
> - if (!APR_STATUS_IS_EOF(status) &&
> - close_connection != SERF_ERROR_CLOSING) {
> - /* Whether success, or an error, there is no more to do unless
> - * this request has been completed.
> - */
> - goto error;
> - }
> -
> - /* The response has been fully-read, so that means the request has
> - * either been fully-delivered (most likely), or that we don't need
> to
> - * write the rest of it anymore, e.g. when a 408 Request timeout was
> - $ received.
> - * Remove it from our queue and loop to read another response.
> - */
> - if (request == conn->written_reqs) {
> - conn->written_reqs = request->next;
> - conn->nr_of_written_reqs--;
> - } else {
> - conn->unwritten_reqs = request->next;
> - conn->nr_of_unwritten_reqs--;
> - }
> -
> - serf__destroy_request(request);
> -
> - request = conn->written_reqs;
> - if (!request) {
> - /* Received responses for all written requests */
> - conn->written_reqs_tail = NULL;
> - /* Request wasn't completely written yet! */
> - request = conn->unwritten_reqs;
> - if (!request)
> - conn->unwritten_reqs_tail = NULL;
> - }
> -
> - conn->completed_responses++;
> -
> - /* We have received a response. If there are no more outstanding
> - requests on this connection, we should stop polling for READ
> events
> - for now. */
> - if (!conn->written_reqs && !conn->unwritten_reqs) {
> - serf_io__set_pollset_dirty(&conn->io);
> - }
> -
> - /* This means that we're being advised that the connection is done.
> */
> - if (close_connection == SERF_ERROR_CLOSING) {
> - reset_connection(conn, 1);
> - if (APR_STATUS_IS_EOF(status))
> - status = APR_SUCCESS;
> - goto error;
> - }
> -
> - /* The server is suddenly deciding to serve more responses than we've
> - * seen before.
> - *
> - * Let our requests go.
> - */
> - if (conn->probable_keepalive_limit &&
> - conn->completed_responses >= conn->probable_keepalive_limit) {
> - conn->probable_keepalive_limit = 0;
> - }
> -
> - /* If we just ran out of requests or have unwritten requests, then
> - * update the pollset. We don't want to read from this socket any
> - * more. We are definitely done with this loop, too.
> - */
> - if (request == NULL || request->writing == SERF_WRITING_NONE) {
> - serf_io__set_pollset_dirty(&conn->io);
> - status = APR_SUCCESS;
> - goto error;
> - }
> - }
> -
> -error:
> - /* ### This code handles some specific errors as a retry.
> - Eventually we should move to a handling where the application
> - can tell us if this is really a good idea for specific requests */
> -
> - if (status == SERF_ERROR_SSL_NEGOTIATE_IN_PROGRESS) {
> - /* This connection uses HTTP pipelining and the server asked for a
> - renegotiation (e.g. to access the requested resource a specific
> - client certificate is required).
> -
> - Because of a known problem in OpenSSL this won't work most of the
> - time, so as a workaround, when the server asks for a renegotiation
> - on a connection using HTTP pipelining, we reset the connection,
> - disable pipelining and reconnect to the server. */
> - serf__log(LOGLVL_WARNING, LOGCOMP_CONN, __FILE__, conn-
> >config,
> - "The server requested renegotiation. Disable HTTP "
> - "pipelining and reset the connection.\n", conn);
> -
> - serf__connection_set_pipelining(conn, 0);
> - reset_connection(conn, 1);
> - status = APR_SUCCESS;
> - }
> - else if (status == SERF_ERROR_REQUEST_LOST
> - || APR_STATUS_IS_ECONNRESET(status)
> - || APR_STATUS_IS_ECONNABORTED(status)) {
> -
> - /* Some systems will not generate a HUP poll event for these errors
> - so we handle the ECONNRESET issue and ECONNABORT here. */
> -
> - /* If the connection was ever good, be optimistic & try again.
> - If it has never tried again (incl. a retry), fail. */
> - if (conn->completed_responses) {
> - reset_connection(conn, 1);
> - status = APR_SUCCESS;
> - }
> - else if (status == SERF_ERROR_REQUEST_LOST) {
> - status = SERF_ERROR_ABORTED_CONNECTION;
> - }
> - }
> -
> - apr_pool_destroy(tmppool);
> - return status;
> -}
> -
> -/* The connection got reset by the server. On Windows this can happen
> - when all data is read, so just cleanup the connection and open a new one.
> -
> - If we haven't had any successful responses on this connection,
> - then error out as it is likely a server issue. */
> -static apr_status_t hangup_connection(serf_connection_t *conn)
> -{
> - if (conn->completed_responses) {
> - return reset_connection(conn, 1);
> - }
> -
> - return SERF_ERROR_ABORTED_CONNECTION;
> -}
> -
> -/* process all events on the connection */
> -static apr_status_t process_connection(serf_connection_t *conn,
> - apr_int16_t events)
> -{
> - apr_status_t status;
> -#ifdef SERF_DEBUG_BUCKET_USE
> - serf_request_t *rq;
> -#endif
> -
> -#ifdef SERF_DEBUG_BUCKET_USE
> - serf_debug__entered_loop(conn->allocator);
> -
> - for (rq = conn->written_reqs; rq; rq = rq->next) {
> - if (rq->allocator)
> - serf_debug__entered_loop(rq->allocator);
> - }
> -
> - for (rq = conn->done_reqs; rq; rq = rq->next) {
> - if (rq->allocator)
> - serf_debug__entered_loop(rq->allocator);
> - }
> -#endif
> -
> - /* POLLHUP/ERR should come after POLLIN so if there's an error message
> or
> - * the like sitting on the connection, we give the app a chance to read
> - * it before we trigger a reset condition.
> - */
> - if ((events & APR_POLLIN) != 0
> - && !conn->wait_for_connect) {
> -
> - if ((status = conn->perform_read(conn)) != APR_SUCCESS)
> - return status;
> -
> - /* If we decided to reset our connection, return now as we don't
> - * want to write.
> - */
> - if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
> - return APR_SUCCESS;
> - }
> - }
> - if ((events & APR_POLLHUP) != 0) {
> - /* The connection got reset by the server. */
> - return conn->perform_hangup(conn);
> - }
> - if ((events & APR_POLLERR) != 0) {
> - /* We might be talking to a buggy HTTP server that doesn't
> - * do lingering-close. (httpd < 2.1.8 does this.)
> - *
> - * See:
> - *
> - * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
> - */
> - if (conn->completed_requests && !conn->probable_keepalive_limit) {
> - return reset_connection(conn, 1);
> - }
> -#ifdef SO_ERROR
> - /* If possible, get the error from the platform's socket layer and
> - convert it to an APR status code. */
> - {
> - apr_os_sock_t osskt;
> - if (!apr_os_sock_get(&osskt, conn->skt)) {
> - int error;
> - apr_socklen_t l = sizeof(error);
> -
> - if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error,
> - &l)) {
> - status = APR_FROM_OS_ERROR(error);
> -
> - /* Handle fallback for multi-homed servers.
> -
> - ### Improve algorithm to find better than just 'next'?
> -
> - Current Windows versions already handle re-ordering
> for
> - api users by using statistics on the recently failed
> - connections to order the list of addresses. */
> - if (conn->completed_requests == 0
> - && conn->address->next != NULL
> - && (APR_STATUS_IS_ECONNREFUSED(status)
> - || APR_STATUS_IS_TIMEUP(status)
> - || APR_STATUS_IS_ENETUNREACH(status))) {
> -
> - conn->address = conn->address->next;
> - return reset_connection(conn, 1);
> - }
> -
> - return status;
> - }
> - }
> - }
> -#endif
> - return APR_EGENERAL;
> - }
> - if ((events & APR_POLLOUT) != 0) {
> - if (conn->wait_for_connect) {
> - conn->wait_for_connect = FALSE;
> -
> - /* We are now connected. Socket is now usable */
> - serf_io__set_pollset_dirty(&conn->io);
> -
> - if ((status = connect_connection(conn)) != APR_SUCCESS)
> - return status;
> - }
> -
> - if ((status = conn->perform_write(conn)) != APR_SUCCESS)
> - return status;
> - }
> - return APR_SUCCESS;
> -}
> -
> -apr_status_t serf__process_connection(serf_connection_t *conn,
> - apr_int16_t events)
> -{
> - serf_context_t *ctx = conn->ctx;
> - apr_pollfd_t tdesc = { 0 };
> -
> - /* If this connection has already failed, return the error again, and try
> - * to remove it from the pollset again
> - */
> - if (conn->status) {
> - tdesc.desc_type = APR_POLL_SOCKET;
> - tdesc.desc.s = conn->skt;
> - tdesc.reqevents = conn->io.reqevents;
> - ctx->pollset_rm(ctx->pollset_baton,
> - &tdesc, &conn->io);
> - return conn->status;
> - }
> - /* apr_pollset_poll() can return a conn multiple times... */
> - if ((conn->seen_in_pollset & events) != 0 ||
> - (conn->seen_in_pollset & APR_POLLHUP) != 0) {
> - return APR_SUCCESS;
> - }
> -
> - conn->seen_in_pollset |= events;
> -
> - if ((conn->status = process_connection(conn, events)) != APR_SUCCESS)
> - {
> - /* it's possible that the connection was already reset and thus the
> - socket cleaned up. */
> - if (conn->skt) {
> - tdesc.desc_type = APR_POLL_SOCKET;
> - tdesc.desc.s = conn->skt;
> - tdesc.reqevents = conn->io.reqevents;
> - ctx->pollset_rm(ctx->pollset_baton,
> - &tdesc, &conn->io);
> - }
> - return conn->status;
> - }
> - return APR_SUCCESS;
> -}
> -
> -serf_connection_t *serf_connection_create(
> - serf_context_t *ctx,
> - apr_sockaddr_t *address,
> - serf_connection_setup_t setup,
> - void *setup_baton,
> - serf_connection_closed_t closed,
> - void *closed_baton,
> - apr_pool_t *pool)
> -{
> - serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
> -
> - conn->ctx = ctx;
> - conn->status = APR_SUCCESS;
> - /* Ignore server address if proxy was specified. */
> - conn->address = ctx->proxy_address ? ctx->proxy_address : address;
> - conn->setup = setup;
> - conn->setup_baton = setup_baton;
> - conn->closed = closed;
> - conn->closed_baton = closed_baton;
> - conn->pool = pool;
> - conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
> - conn->stream = NULL;
> - conn->ostream_head = NULL;
> - conn->ostream_tail = NULL;
> - conn->io.type = SERF_IO_CONN;
> - conn->io.u.conn = conn;
> - conn->io.ctx = ctx;
> - conn->io.dirty_conn = false;
> - conn->io.reqevents = 0;
> - conn->hit_eof = 0;
> - conn->state = SERF_CONN_INIT;
> - conn->latency = -1; /* unknown */
> - conn->stop_writing = 0;
> - conn->write_now = 0;
> - conn->wait_for_connect = 0;
> - conn->pipelining = 1;
> - conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1;
> - conn->perform_read = read_from_connection;
> - conn->perform_write = write_to_connection;
> - conn->perform_hangup = hangup_connection;
> - conn->perform_teardown = NULL;
> - conn->protocol_baton = NULL;
> -
> - conn->written_reqs = conn->written_reqs_tail = NULL;
> - conn->nr_of_written_reqs = 0;
> -
> - conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL;
> - conn->nr_of_unwritten_reqs;
> -
> - conn->done_reqs = conn->done_reqs_tail = 0;
> -
> - /* Create a subpool for our connection. */
> - apr_pool_create(&conn->skt_pool, conn->pool);
> -
> - /* register a cleanup */
> - apr_pool_cleanup_register(conn->pool, conn, clean_conn,
> - apr_pool_cleanup_null);
> -
> - /* Add the connection to the context. */
> - *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
> -
> - return conn;
> -}
> -
> -apr_status_t serf_connection_create2(
> - serf_connection_t **conn,
> - serf_context_t *ctx,
> - apr_uri_t host_info,
> - serf_connection_setup_t setup,
> - void *setup_baton,
> - serf_connection_closed_t closed,
> - void *closed_baton,
> - apr_pool_t *pool)
> -{
> - apr_status_t status = APR_SUCCESS;
> - serf_config_t *config;
> - serf_connection_t *c;
> - apr_sockaddr_t *host_address = NULL;
> -
> - /* Set the port number explicitly, needed to create the socket later. */
> - if (!host_info.port) {
> - host_info.port = apr_uri_port_of_scheme(host_info.scheme);
> - }
> -
> - /* Only lookup the address of the server if no proxy server was
> - configured. */
> - if (!ctx->proxy_address) {
> - status = apr_sockaddr_info_get(&host_address,
> - host_info.hostname,
> - APR_UNSPEC, host_info.port, 0, pool);
> - if (status)
> - return status;
> - }
> -
> - c = serf_connection_create(ctx, host_address, setup, setup_baton,
> - closed, closed_baton, pool);
> -
> - /* We're not interested in the path following the hostname. */
> - c->host_url = apr_uri_unparse(c->pool,
> - &host_info,
> - APR_URI_UNP_OMITPATHINFO |
> - APR_URI_UNP_OMITUSERINFO);
> -
> - /* Store the host info without the path on the connection. */
> - (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info));
> - if (!c->host_info.port) {
> - c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme);
> - }
> -
> - /* Store the connection specific info in the configuration store */
> - status = serf__config_store_get_config(ctx, c, &config, pool);
> - if (status)
> - return status;
> - c->config = config;
> - serf_config_set_stringc(config, SERF_CONFIG_HOST_NAME,
> - c->host_info.hostname);
> - serf_config_set_stringc(config, SERF_CONFIG_HOST_PORT,
> - apr_itoa(ctx->pool, c->host_info.port));
> -
> - *conn = c;
> -
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, c->config,
> - "created connection 0x%x\n", c);
> -
> - return status;
> -}
> -
> -apr_status_t serf_connection_reset(
> - serf_connection_t *conn)
> -{
> - return reset_connection(conn, 0);
> -}
> -
> -
> -apr_status_t serf_connection_close(
> - serf_connection_t *conn)
> -{
> - int i;
> - serf_context_t *ctx = conn->ctx;
> - apr_status_t status;
> -
> - for (i = ctx->conns->nelts; i--; ) {
> - serf_connection_t *conn_seq = GET_CONN(ctx, i);
> -
> - if (conn_seq == conn) {
> -
> - /* Clean up the write bucket first, as this marks all partially
> written
> - requests as fully written, allowing more efficient cleanup */
> - serf__connection_pre_cleanup(conn);
> -
> - /* The application asked to close the connection, no need to
> notify
> - it for each cancelled request. */
> - while (conn->written_reqs) {
> - serf__cancel_request(conn->written_reqs, &conn->written_reqs,
> 0);
> - }
> - while (conn->unwritten_reqs) {
> - serf__cancel_request(conn->unwritten_reqs, &conn-
> >unwritten_reqs, 0);
> - }
> - if (conn->skt != NULL) {
> - remove_connection(ctx, conn);
> - status = clean_skt(conn);
> - if (conn->closed != NULL) {
> - handle_conn_closed(conn, status);
> - }
> - }
> - if (conn->stream != NULL) {
> - serf_bucket_destroy(conn->stream);
> - conn->stream = NULL;
> - }
> -
> - if (conn->protocol_baton) {
> - conn->perform_teardown(conn);
> - conn->protocol_baton = NULL;
> - }
> -
> - /* Remove the connection from the context. We don't want to
> - * deal with it any more.
> - */
> - if (i < ctx->conns->nelts - 1) {
> - /* move later connections over this one. */
> - memmove(
> - &GET_CONN(ctx, i),
> - &GET_CONN(ctx, i + 1),
> - (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t
> *));
> - }
> - --ctx->conns->nelts;
> -
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn-
> >config,
> - "closed connection 0x%x\n", conn);
> -
> - /* Found the connection. Closed it. All done. */
> - return APR_SUCCESS;
> - }
> - }
> -
> - /* We didn't find the specified connection. */
> - /* ### doc talks about this w.r.t poll structures. use something else? */
> - return APR_NOTFOUND;
> -}
> -
> -
> -void serf_connection_set_max_outstanding_requests(
> - serf_connection_t *conn,
> - unsigned int max_requests)
> -{
> - if (max_requests == 0)
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "Set max. nr. of outstanding requests for this "
> - "connection to unlimited.\n");
> - else
> - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
> - "Limit max. nr. of outstanding requests for this "
> - "connection to %u.\n", max_requests);
> -
> - conn->max_outstanding_requests = max_requests;
> -}
> -
> -/* Disable HTTP pipelining, ensure that only one request is outstanding at a
> - time. This is an internal method, an application that wants to disable
> - HTTP pipelining can achieve this by calling:
> - serf_connection_set_max_outstanding_requests(conn, 1) .
> - */
> -void serf__connection_set_pipelining(serf_connection_t *conn, int
> enabled)
> -{
> - conn->pipelining = enabled;
> -}
> -
> -void serf_connection_set_async_responses(
> - serf_connection_t *conn,
> - serf_response_acceptor_t acceptor,
> - void *acceptor_baton,
> - serf_response_handler_t handler,
> - void *handler_baton)
> -{
> - conn->async_responses = 1;
> - conn->async_acceptor = acceptor;
> - conn->async_acceptor_baton = acceptor_baton;
> - conn->async_handler = handler;
> - conn->async_handler_baton = handler_baton;
> -}
> -
> -void serf_connection_set_framing_type(
> - serf_connection_t *conn,
> - serf_connection_framing_type_t framing_type)
> -{
> - conn->framing_type = framing_type;
> -
> - if (conn->skt) {
> - serf_io__set_pollset_dirty(&conn->io);
> - conn->stop_writing = 0;
> - conn->write_now = 1;
> -
> - /* Close down existing protocol */
> - if (conn->protocol_baton) {
> - conn->perform_teardown(conn);
> - conn->protocol_baton = NULL;
> - }
> - }
> -
> - /* Reset to default */
> - conn->perform_read = read_from_connection;
> - conn->perform_write = write_to_connection;
> - conn->perform_hangup = hangup_connection;
> - conn->perform_teardown = NULL;
> -
> - switch (framing_type) {
> - case SERF_CONNECTION_FRAMING_TYPE_HTTP2:
> - serf__http2_protocol_init(conn);
> - break;
> - default:
> - break;
> - }
> -}
> -
> -apr_interval_time_t serf_connection_get_latency(serf_connection_t
> *conn)
> -{
> - if (conn->ctx->proxy_address) {
> - /* Detecting network latency for proxied connection is not
> implemented
> - yet. */
> - return -1;
> - }
> -
> - return conn->latency;
> -}
> -
> -unsigned int serf_connection_queued_requests(serf_connection_t *conn)
> -{
> - return conn->nr_of_unwritten_reqs;
> -}
> -
> -unsigned int serf_connection_pending_requests(serf_connection_t *conn)
> -{
> - return conn->nr_of_unwritten_reqs + conn->nr_of_written_reqs;
> -}
>
> Modified: serf/trunk/serf_private.h
> URL:
> http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715005&r1=1
> 715004&r2=1715005&view=diff
> ==========================================================
> ====================
> --- serf/trunk/serf_private.h (original)
> +++ serf/trunk/serf_private.h Wed Nov 18 14:45:58 2015
> @@ -147,6 +147,33 @@ typedef struct serf_io_baton_t {
>
> } serf_io_baton_t;
>
> +typedef struct serf_pump_io_t
> +{
> + serf_io_baton_t *io;
> +
> + serf_bucket_alloc_t *allocator;
> + serf_config_t *config;
> +
> + serf_bucket_t *stream;
> + serf_bucket_t *ostream_head;
> + serf_bucket_t *ostream_tail;
> +
> + apr_socket_t *skt;
> +
> + /* Outgoing vecs, waiting to be written.
> + Read from ostream_head */
> + struct iovec vec[IOV_MAX];
> + int vec_len;
> +
> + /* True when connection failed while writing */
> + bool done_writing;
> + bool stop_writing; /* Wait for read (E.g. SSL) */
> +
> + /* Set to true when ostream_tail was read to EOF */
> + bool hit_eof;
> +} serf_pump_t;
> +
> +
> /* Should we use static APR_INLINE instead? */
> #define serf_io__set_pollset_dirty(io_baton) \
> do \
> @@ -381,6 +408,7 @@ struct serf_incoming_t {
> serf_context_t *ctx;
>
> serf_io_baton_t io;
> + serf_pump_t pump;
> serf_incoming_request_setup_t req_setup;
> void *req_setup_baton;
>
> @@ -403,8 +431,6 @@ struct serf_incoming_t {
> serf_connection_framing_type_t framing_type;
>
> bool wait_for_connect;
> - bool hit_eof;
> - bool stop_writing;
>
> /* Event callbacks, called from serf__process_client() to do the actual
> processing. */
> @@ -416,13 +442,6 @@ struct serf_incoming_t {
> void(*perform_teardown)(serf_incoming_t *conn);
> void *protocol_baton;
>
> - /* A bucket wrapped around our socket (for reading responses). */
> - serf_bucket_t *stream;
> - /* A reference to the aggregate bucket that provides the boundary
> between
> - * request level buckets and connection level buckets.
> - */
> - serf_bucket_t *ostream_head;
> - serf_bucket_t *ostream_tail;
>
> serf_config_t *config;
>
> @@ -742,6 +761,25 @@ void serf__link_requests(serf_request_t
> apr_status_t serf__handle_response(serf_request_t *request,
> apr_pool_t *pool);
>
> +/* From pump.c */
> +void serf_pump__init(serf_pump_t *pump,
> + serf_io_baton_t *io,
> + apr_socket_t *skt,
> + serf_config_t *config,
> + serf_bucket_alloc_t *allocator,
> + apr_pool_t *pool);
> +
> +bool serf_pump__data_pending(serf_pump_t *pump);
> +void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump);
> +
> +apr_status_t serf_pump__write(serf_pump_t *pump,
> + bool fetch_new);
> +
> +/* These must always be called as a pair to avoid a memory leak */
> +void serf_pump__prepare_setup(serf_pump_t *pump);
> +void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t
> *ostream);
> +
> +
> /** Logging functions. **/
>
> /* Initialize the logging subsystem. This will store a log baton in the
>