Author: rhuijben Date: Wed Nov 18 16:56:09 2015 New Revision: 1715022 URL: http://svn.apache.org/viewvc?rev=1715022&view=rev Log: Start using the bucket pump for outgoing connections. This switches some of the easy code over to using the pump, while other parts now use the pump state directly.
* incoming.c (client_connected): Update caller. * outgoing.c (data_pending): Remove function. (request_or_data_pending): Update caller. (serf__conn_update_pollset): Use serf_pump__data_pending(). (serf__connection_pre_cleanup): Use the pump for cleaning. (do_conn_setup): Use new helper functions. (prepare_conn_streams): Update usage. (store_ipaddresses_in_config): Remove function. (connect_connection): Update caller. (serf__open_connections): Initialize pump. (no_more_writes): Update user. (reset_connection): Update caller. (socket_writev): Switch argument type. (serf__connection_flush): Use pump vars. (handle_async_response, read_from_connection): Update caller. (serf_connection_create): Simplify init. * protocols/fcgi_protocol.c (fcgi_outgoing_read, serf__fcgi_protocol_init): Update init. * protocols/http2_protocol.c (serf__http2_protocol_init, http2_outgoing_read): Update init. * pump.c (serf_pump__init): Init pool. (serf_pump__done): New function. (serf_pump__complete_setup): Add stream argument. * serf_private.h (serf_pump_t): Fix name. Add pool. (serf_connection_t): Add pump. Remove old vars. (serf_pump__done): New function. (serf_pump__complete_setup): New function. * ssltunnel.c (handle_response, detect_eof, serf__ssltunnel_connect): Update usages. Modified: serf/trunk/incoming.c serf/trunk/outgoing.c serf/trunk/protocols/fcgi_protocol.c serf/trunk/protocols/http2_protocol.c serf/trunk/pump.c serf/trunk/serf_private.h serf/trunk/ssltunnel.c Modified: serf/trunk/incoming.c URL: http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/incoming.c (original) +++ serf/trunk/incoming.c Wed Nov 18 16:56:09 2015 @@ -32,6 +32,7 @@ static apr_status_t client_connected(ser { /* serf_context_t *ctx = client->ctx; */ apr_status_t status; + serf_bucket_t *stream; serf_bucket_t *ostream; serf_pump__store_ipaddresses_in_config(&client->pump); @@ -46,17 +47,17 @@ static apr_status_t client_connected(ser ostream = client->pump.ostream_tail; status = client->setup(client->skt, - &client->pump.stream, + &stream, &ostream, client->setup_baton, client->pool); if (status) { - serf_pump__complete_setup(&client->pump, NULL); + serf_pump__complete_setup(&client->pump, NULL, NULL); /* ### Cleanup! (serf__connection_pre_cleanup) */ return status; } - serf_pump__complete_setup(&client->pump, ostream); + serf_pump__complete_setup(&client->pump, stream, ostream); if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) { client->proto_peek_bkt = serf_bucket_aggregate_create( Modified: serf/trunk/outgoing.c URL: http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/outgoing.c (original) +++ serf/trunk/outgoing.c Wed Nov 18 16:56:09 2015 @@ -67,36 +67,6 @@ static apr_status_t clean_conn(void *dat return APR_SUCCESS; } -/* Safely check if there is still data pending on the connection, carefull - to not accidentally make it invalid. */ -static int -data_pending(serf_connection_t *conn) -{ - if (conn->vec_len > 0) - return TRUE; /* We can't poll right now! */ - - if (conn->ostream_head) { - const char *dummy; - apr_size_t len; - apr_status_t status; - - status = serf_bucket_peek(conn->ostream_head, &dummy, - &len); - if (!SERF_BUCKET_READ_ERROR(status)) { - if (len > 0) { - serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, - "Extra data to be written after sending complete " - "requests.\n"); - return TRUE; - } - } - else - return TRUE; /* Sure, we have data (an error) */ - } - - return FALSE; -} - static int request_pending(serf_request_t **next_req, serf_connection_t *conn) { @@ -136,7 +106,7 @@ request_or_data_pending(serf_request_t * if (request_pending(next_req, conn)) return TRUE; - return data_pending(conn); + return serf_pump__data_pending(&conn->pump); } /* Update the pollset for this connection. We tweak the pollset based on @@ -181,41 +151,8 @@ apr_status_t serf__conn_update_pollset(s But it also has the nice side effect of removing references from the aggregate to requests that are done. */ - if (conn->vec_len) { - /* We still have vecs in the connection, which lifetime is - managed by buckets inside conn->ostream_head. - - Don't touch ostream as that might destroy the vecs */ - - data_waiting = (conn->state != SERF_CONN_CLOSING); - } - else { - serf_bucket_t *ostream; - - ostream = conn->ostream_head; - - if (!ostream) - ostream = conn->ssltunnel_ostream; - - if (ostream) { - const char *dummy_data; - apr_size_t len; - - status = serf_bucket_peek(ostream, &dummy_data, &len); - - if (SERF_BUCKET_READ_ERROR(status) || len > 0) { - /* DATA or error waiting */ - data_waiting = TRUE; /* Error waiting */ - } - else if (! status || APR_STATUS_IS_EOF(status)) { - data_waiting = FALSE; - } - else - data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */ - } - else - data_waiting = FALSE; - } + data_waiting = serf_pump__data_pending(&conn->pump) + && (conn->state != SERF_CONN_CLOSING); if (data_waiting) { desc.reqevents |= APR_POLLOUT; @@ -229,7 +166,7 @@ apr_status_t serf__conn_update_pollset(s desc.reqevents |= APR_POLLIN; /* Don't write if OpenSSL told us that it needs to read data first. */ - if (! conn->stop_writing && !data_waiting) { + if (! conn->pump.stop_writing && !data_waiting) { /* This check is duplicated in write_to_connection() */ if ((conn->probable_keepalive_limit && @@ -288,16 +225,10 @@ static void check_buckets_drained(serf_c void serf__connection_pre_cleanup(serf_connection_t *conn) { serf_request_t *rq; - conn->vec_len = 0; + conn->pump.vec_len = 0; + + serf_pump__done(&conn->pump); - if (conn->ostream_head != NULL) { -#ifdef SERF_DEBUG_BUCKET_USE - serf__bucket_drain(conn->ostream_head); -#endif - serf_bucket_destroy(conn->ostream_head); - conn->ostream_head = NULL; - conn->ostream_tail = NULL; - } if (conn->ssltunnel_ostream != NULL) { serf_bucket_destroy(conn->ssltunnel_ostream); conn->ssltunnel_ostream = NULL; @@ -324,58 +255,32 @@ void serf__connection_pre_cleanup(serf_c conn->done_reqs = conn->done_reqs_tail = NULL; } -static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket) -{ - serf_connection_t *conn = baton; - conn->hit_eof = 1; - return APR_EAGAIN; -} - static apr_status_t do_conn_setup(serf_connection_t *conn) { apr_status_t status; - serf_bucket_t *ostream; + serf_bucket_t *ostream, *stream; /* ### dunno what the hell this is about. this latency stuff got ### added, and who knows whether it should stay... */ conn->latency = apr_time_now() - conn->connect_time; - if (conn->ostream_head == NULL) { - conn->ostream_head = serf_bucket_aggregate_create(conn->allocator); - } + serf_pump__prepare_setup(&conn->pump); - if (conn->ostream_tail == NULL) { - conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator); - - serf_bucket_aggregate_hold_open(conn->ostream_tail, detect_eof, conn); - } - - ostream = conn->ostream_tail; + ostream = conn->pump.ostream_tail; status = (*conn->setup)(conn->skt, - &conn->stream, + &stream, &ostream, conn->setup_baton, conn->pool); if (status) { /* extra destroy here since it wasn't added to the head bucket yet. */ - serf_bucket_destroy(conn->ostream_tail); + serf_pump__complete_setup(&conn->pump, NULL, NULL); serf__connection_pre_cleanup(conn); return status; } - /* Share the configuration with all the buckets in the newly created output - chain (see PLAIN or ENCRYPTED scenario's), including the request buckets - created by the application (ostream_tail will handle this for us). */ - serf_bucket_set_config(conn->ostream_head, conn->config); - - /* Share the configuration with the ssl_decrypt and socket buckets. The - response buckets wrapping the ssl_decrypt/socket buckets won't get the - config automatically because they are upstream. */ - serf_bucket_set_config(conn->stream, conn->config); - - serf_bucket_aggregate_append(conn->ostream_head, - ostream); + serf_pump__complete_setup(&conn->pump, stream, ostream); /* We typically have one of two scenarios, based on whether the application decided to encrypt this connection: @@ -419,22 +324,23 @@ static apr_status_t prepare_conn_streams /* If the connection does not have an associated bucket, then * call the setup callback to get one. */ - if (conn->stream == NULL) { + if (conn->pump.stream == NULL) { status = do_conn_setup(conn); if (status) { return status; } } - *ostreamt = conn->ostream_tail; - *ostreamh = conn->ostream_head; + *ostreamt = conn->pump.ostream_tail; + *ostreamh = conn->pump.ostream_head; } else if (conn->state == SERF_CONN_SETUP_SSLTUNNEL) { /* SSL tunnel needed and not set up yet, get a direct unencrypted stream for this socket */ - if (conn->stream == NULL) { - conn->stream = serf_context_bucket_socket_create(conn->ctx, - conn->skt, - conn->allocator); + if (conn->pump.stream == NULL) { + conn->pump.stream = + serf_context_bucket_socket_create(conn->ctx, + conn->skt, + conn->allocator); } /* Don't create the ostream bucket chain including the ssl_encrypt @@ -444,38 +350,19 @@ static apr_status_t prepare_conn_streams } else { /* SERF_CONN_CLOSING or SERF_CONN_INIT */ - *ostreamt = conn->ostream_tail; - *ostreamh = conn->ostream_head; + *ostreamt = conn->pump.ostream_tail; + *ostreamh = conn->pump.ostream_head; } return APR_SUCCESS; } -static void store_ipaddresses_in_config(serf_config_t *config, - apr_socket_t *skt) -{ - apr_sockaddr_t *sa; - - if (apr_socket_addr_get(&sa, APR_LOCAL, skt) == APR_SUCCESS) { - char buf[48]; - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) - serf_config_set_stringf(config, SERF_CONFIG_CONN_LOCALIP, - "%s:%d", buf, sa->port); - } - if (apr_socket_addr_get(&sa, APR_REMOTE, skt) == APR_SUCCESS) { - char buf[48]; - if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa)) - serf_config_set_stringf(config, SERF_CONFIG_CONN_REMOTEIP, - "%s:%d", buf, sa->port); - } -} - static apr_status_t connect_connection(serf_connection_t *conn) { serf_context_t *ctx = conn->ctx; apr_status_t status; - store_ipaddresses_in_config(conn->config, conn->skt); + serf_pump__store_ipaddresses_in_config(&conn->pump); serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config, "socket for conn 0x%x connected\n", conn); @@ -581,6 +468,9 @@ apr_status_t serf__open_connections(serf if (status) return status; + serf_pump__init(&conn->pump, &conn->io, skt, conn->config, + conn->allocator, conn->pool); + /* Flag our pollset as dirty now that we have a new socket. */ serf_io__set_pollset_dirty(&conn->io); @@ -603,7 +493,7 @@ static apr_status_t no_more_writes(serf_ "stop writing on conn 0x%x\n", conn); /* Clear our iovec. */ - conn->vec_len = 0; + conn->pump.vec_len = 0; /* Update the pollset to know we don't want to write on this socket any * more. @@ -717,22 +607,22 @@ static apr_status_t reset_connection(ser } } - if (conn->stream != NULL) { - serf_bucket_destroy(conn->stream); - conn->stream = NULL; + if (conn->pump.stream != NULL) { + serf_bucket_destroy(conn->pump.stream); + conn->pump.stream = NULL; } /* Don't try to resume any writes */ - conn->vec_len = 0; + conn->pump.vec_len = 0; serf_io__set_pollset_dirty(&conn->io); conn->state = SERF_CONN_INIT; - conn->hit_eof = 0; + conn->pump.hit_eof = 0; conn->connect_time = 0; conn->latency = -1; - conn->stop_writing = 0; - conn->write_now = 0; + conn->pump.stop_writing = false; + conn->write_now = false; /* conn->pipelining */ conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1; @@ -760,7 +650,7 @@ static apr_status_t reset_connection(ser return APR_SUCCESS; } -static apr_status_t socket_writev(serf_connection_t *conn) +static apr_status_t socket_writev(serf_pump_t *conn) { apr_size_t written; apr_status_t status; @@ -805,7 +695,7 @@ static apr_status_t socket_writev(serf_c serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n"); /* Log progress information */ - serf__context_progress_delta(conn->ctx, 0, written); + serf__context_progress_delta(conn->io->ctx, 0, written); } return status; @@ -818,14 +708,14 @@ apr_status_t serf__connection_flush(serf apr_status_t read_status = APR_SUCCESS; serf_bucket_t *ostreamh = NULL; - conn->hit_eof = FALSE; + conn->pump.hit_eof = FALSE; while (status == APR_SUCCESS) { /* First try to write out what is already stored in the connection vecs. */ - while (conn->vec_len && !status) { - status = socket_writev(conn); + while (conn->pump.vec_len && !status) { + status = socket_writev(&conn->pump); /* If the write would have blocked, then we're done. * Don't try to write anything else to the socket. @@ -838,7 +728,7 @@ apr_status_t serf__connection_flush(serf if (status || !pump) return status; - else if (read_status || conn->vec_len || conn->hit_eof) + else if (read_status || conn->pump.vec_len || conn->pump.hit_eof) return read_status; /* Ok, with the vecs written, we can now refill the per connection @@ -856,12 +746,12 @@ apr_status_t serf__connection_flush(serf data as available, we probably don't want to read ALL_AVAIL, but a lower number, like the size of one or a few TCP packets, the available TCP buffer size ... */ - conn->hit_eof = 0; + conn->pump.hit_eof = 0; read_status = serf_bucket_read_iovec(ostreamh, SERF_READ_ALL_AVAIL, IOV_MAX, - conn->vec, - &conn->vec_len); + conn->pump.vec, + &conn->pump.vec_len); if (read_status == SERF_ERROR_WAIT_CONN) { /* The bucket told us that it can't provide more data until @@ -873,7 +763,7 @@ apr_status_t serf__connection_flush(serf we can actually write something. otherwise, we could end up in a CPU spin: socket wants something, but we don't have anything (and keep returning EAGAIN) */ - conn->stop_writing = 1; + conn->pump.stop_writing = 1; serf_io__set_pollset_dirty(&conn->io); read_status = APR_EAGAIN; @@ -881,7 +771,7 @@ apr_status_t serf__connection_flush(serf else if (APR_STATUS_IS_EAGAIN(read_status)) { /* We read some stuff, but did we read everything ? */ - if (conn->hit_eof) + if (conn->pump.hit_eof) read_status = APR_SUCCESS; } else if (SERF_BUCKET_READ_ERROR(read_status)) { @@ -1087,7 +977,7 @@ static apr_status_t handle_async_respons if (conn->current_async_response == NULL) { conn->current_async_response = - (*conn->async_acceptor)(NULL, conn->stream, + (*conn->async_acceptor)(NULL, conn->pump.stream, conn->async_acceptor_baton, pool); } @@ -1121,8 +1011,8 @@ static apr_status_t read_from_connection /* If the stop_writing flag was set on the connection, reset it now because there is some data to read. */ - if (conn->stop_writing) { - conn->stop_writing = 0; + if (conn->pump.stop_writing) { + conn->pump.stop_writing = false; serf_io__set_pollset_dirty(&conn->io); } @@ -1182,7 +1072,7 @@ static apr_status_t read_from_connection const char *data; apr_size_t len; - status = serf_bucket_peek(conn->stream, &data, &len); + status = serf_bucket_peek(conn->pump.stream, &data, &len); if (APR_STATUS_IS_EOF(status)) { reset_connection(conn, 1); @@ -1199,7 +1089,7 @@ static apr_status_t read_from_connection /* Unexpected response from the server */ if (conn->write_now) { - conn->write_now = 0; + conn->write_now = false; status = conn->perform_write(conn); if (!SERF_BUCKET_READ_ERROR(status)) @@ -1220,7 +1110,7 @@ static apr_status_t read_from_connection return SERF_ERROR_BAD_HTTP_RESPONSE; } - request->resp_bkt = (*request->acceptor)(request, conn->stream, + request->resp_bkt = (*request->acceptor)(request, conn->pump.stream, request->acceptor_baton, tmppool); apr_pool_clear(tmppool); @@ -1554,19 +1444,14 @@ serf_connection_t *serf_connection_creat conn->closed_baton = closed_baton; conn->pool = pool; conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL); - conn->stream = NULL; - conn->ostream_head = NULL; - conn->ostream_tail = NULL; conn->io.type = SERF_IO_CONN; conn->io.u.conn = conn; conn->io.ctx = ctx; conn->io.dirty_conn = false; conn->io.reqevents = 0; - conn->hit_eof = 0; conn->state = SERF_CONN_INIT; conn->latency = -1; /* unknown */ - conn->stop_writing = 0; - conn->write_now = 0; + conn->write_now = false; conn->wait_for_connect = 0; conn->pipelining = 1; conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1; @@ -1698,9 +1583,9 @@ apr_status_t serf_connection_close( handle_conn_closed(conn, status); } } - if (conn->stream != NULL) { - serf_bucket_destroy(conn->stream); - conn->stream = NULL; + if (conn->pump.stream != NULL) { + serf_bucket_destroy(conn->pump.stream); + conn->pump.stream = NULL; } if (conn->protocol_baton) { @@ -1782,8 +1667,8 @@ void serf_connection_set_framing_type( if (conn->skt) { serf_io__set_pollset_dirty(&conn->io); - conn->stop_writing = 0; - conn->write_now = 1; + conn->pump.stop_writing = 0; + conn->write_now = true; /* Close down existing protocol */ if (conn->protocol_baton) { Modified: serf/trunk/protocols/fcgi_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.c (original) +++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 16:56:09 2015 @@ -500,7 +500,7 @@ static apr_status_t fcgi_outgoing_read(s serf_fcgi_protocol_t *fcgi = conn->protocol_baton; if (!fcgi->stream) - fcgi->stream = conn->stream; + fcgi->stream = conn->pump.stream; return fcgi_read(fcgi); } @@ -537,8 +537,8 @@ void serf__fcgi_protocol_init(serf_conne fcgi->pool = protocol_pool; fcgi->conn = conn; fcgi->io = &conn->io; - fcgi->stream = conn->stream; - fcgi->ostream = conn->ostream_tail; + fcgi->stream = conn->pump.stream; + fcgi->ostream = conn->pump.ostream_tail; fcgi->allocator = conn->allocator; fcgi->config = conn->config; Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 16:56:09 2015 @@ -239,8 +239,8 @@ void serf__http2_protocol_init(serf_conn h2->pool = protocol_pool; h2->conn = conn; h2->io = &conn->io; - h2->stream = conn->stream; - h2->ostream = conn->ostream_tail; + h2->stream = conn->pump.stream; + h2->ostream = conn->pump.ostream_tail; h2->allocator = conn->allocator; h2->config = conn->config; @@ -1644,14 +1644,14 @@ http2_outgoing_read(serf_connection_t *c /* If the stop_writing flag was set on the connection, reset it now because there is some data to read. */ - if (conn->stop_writing) + if (conn->pump.stop_writing) { - conn->stop_writing = 0; + conn->pump.stop_writing = false; serf_io__set_pollset_dirty(&conn->io); } if (h2->stream == NULL) - h2->stream = conn->stream; + h2->stream = conn->pump.stream; status = http2_process(h2); Modified: serf/trunk/pump.c URL: http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/pump.c (original) +++ serf/trunk/pump.c Wed Nov 18 16:56:09 2015 @@ -58,11 +58,37 @@ void serf_pump__init(serf_pump_t *pump, pump->allocator = allocator; pump->config = config; pump->skt = skt; + pump->pool = pool; apr_pool_cleanup_register(pool, pump, pump_cleanup, apr_pool_cleanup_null); } +void serf_pump__done(serf_pump_t *pump) +{ + if (pump->pool) { + apr_pool_cleanup_run(pump->pool, pump, pump_cleanup); + } + + pump->io = NULL; + pump->allocator = NULL; + pump->config = NULL; + + /* pump->stream is managed by the current reader! */ + + pump->ostream_head = NULL; + pump->ostream_tail = NULL; + + pump->skt = NULL; + pump->vec_len = 0; + + pump->done_writing = false; + pump->stop_writing = false; + pump->hit_eof = false; + + pump->pool = NULL; +} + /* Safely check if there is still data pending on the connection, carefull to not accidentally make it invalid. */ bool serf_pump__data_pending(serf_pump_t *pump) @@ -118,8 +144,10 @@ void serf_pump__prepare_setup(serf_pump_ } void serf_pump__complete_setup(serf_pump_t *pump, + serf_bucket_t *stream, serf_bucket_t *ostream) { + pump->stream = stream; if (ostream) serf_bucket_aggregate_append(pump->ostream_head, ostream); else Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Wed Nov 18 16:56:09 2015 @@ -147,14 +147,18 @@ typedef struct serf_io_baton_t { } serf_io_baton_t; -typedef struct serf_pump_io_t +typedef struct serf_pump_t { serf_io_baton_t *io; serf_bucket_alloc_t *allocator; serf_config_t *config; + /* The incoming stream. Stored here for easy access by users, + but not managed as part of the pump */ serf_bucket_t *stream; + + /* The outgoing stream */ serf_bucket_t *ostream_head; serf_bucket_t *ostream_tail; @@ -171,6 +175,8 @@ typedef struct serf_pump_io_t /* Set to true when ostream_tail was read to EOF */ bool hit_eof; + + apr_pool_t *pool; } serf_pump_t; @@ -442,7 +448,6 @@ struct serf_incoming_t { void(*perform_teardown)(serf_incoming_t *conn); void *protocol_baton; - serf_config_t *config; serf_bucket_t *proto_peek_bkt; @@ -464,6 +469,7 @@ struct serf_connection_t { apr_status_t status; serf_io_baton_t io; + serf_pump_t pump; apr_pool_t *pool; serf_bucket_alloc_t *allocator; @@ -496,14 +502,6 @@ struct serf_connection_t { serf_response_handler_t async_handler; void *async_handler_baton; - /* A bucket wrapped around our socket (for reading responses). */ - serf_bucket_t *stream; - /* A reference to the aggregate bucket that provides the boundary between - * request level buckets and connection level buckets. - */ - serf_bucket_t *ostream_head; - serf_bucket_t *ostream_tail; - /* Aggregate bucket used to send the CONNECT request. */ serf_bucket_t *ssltunnel_ostream; @@ -524,9 +522,6 @@ struct serf_connection_t { serf_request_t *done_reqs; serf_request_t *done_reqs_tail; - struct iovec vec[IOV_MAX]; - int vec_len; - serf_connection_setup_t setup; void *setup_baton; serf_connection_closed_t closed; @@ -542,8 +537,6 @@ struct serf_connection_t { only. */ int pipelining; - int hit_eof; - /* Host url, path ommitted, syntax: https://svn.apache.org . */ const char *host_url; @@ -560,11 +553,8 @@ struct serf_connection_t { /* Calculated connection latency. Negative value if latency is unknown. */ apr_interval_time_t latency; - /* Needs to read first before we can write again. */ - int stop_writing; - /* Write out information now */ - int write_now; + bool write_now; /* Wait for connect: connect() returned APR_EINPROGRESS. Socket not usable yet */ @@ -769,6 +759,8 @@ void serf_pump__init(serf_pump_t *pump, serf_bucket_alloc_t *allocator, apr_pool_t *pool); +void serf_pump__done(serf_pump_t *pump); + bool serf_pump__data_pending(serf_pump_t *pump); void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump); @@ -777,7 +769,9 @@ apr_status_t serf_pump__write(serf_pump_ /* These must always be called as a pair to avoid a memory leak */ void serf_pump__prepare_setup(serf_pump_t *pump); -void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream); +void serf_pump__complete_setup(serf_pump_t *pump, + serf_bucket_t *stream, + serf_bucket_t *ostream); /** Logging functions. **/ Modified: serf/trunk/ssltunnel.c URL: http://svn.apache.org/viewvc/serf/trunk/ssltunnel.c?rev=1715022&r1=1715021&r2=1715022&view=diff ============================================================================== --- serf/trunk/ssltunnel.c (original) +++ serf/trunk/ssltunnel.c Wed Nov 18 16:56:09 2015 @@ -111,8 +111,8 @@ static apr_status_t handle_response(serf apr_pool_destroy(ctx->pool); serf_bucket_destroy(conn->ssltunnel_ostream); conn->ssltunnel_ostream = NULL; - serf_bucket_destroy(conn->stream); - conn->stream = NULL; + serf_bucket_destroy(conn->pump.stream); + conn->pump.stream = NULL; ctx = NULL; serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, conn->config, @@ -173,7 +173,7 @@ static apr_status_t setup_request(serf_r static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket) { serf_connection_t *conn = baton; - conn->hit_eof = 1; + conn->pump.hit_eof = true; return APR_EAGAIN; } @@ -182,6 +182,7 @@ apr_status_t serf__ssltunnel_connect(ser { req_ctx_t *ctx; apr_pool_t *ssltunnel_pool; + serf_bucket_t *ssltunnel_ostream; apr_pool_create(&ssltunnel_pool, conn->pool); @@ -190,8 +191,10 @@ apr_status_t serf__ssltunnel_connect(ser ctx->uri = apr_psprintf(ctx->pool, "%s:%d", conn->host_info.hostname, conn->host_info.port); - conn->ssltunnel_ostream = serf_bucket_aggregate_create(conn->allocator); - serf_bucket_aggregate_hold_open(conn->ssltunnel_ostream, detect_eof, conn); + ssltunnel_ostream = serf_bucket_aggregate_create(conn->allocator); + serf_bucket_aggregate_hold_open(ssltunnel_ostream, detect_eof, conn); + + conn->ssltunnel_ostream = ssltunnel_ostream; serf__ssltunnel_request_create(conn, setup_request,