Author: rhuijben
Date: Wed Nov 18 16:56:09 2015
New Revision: 1715022
URL: http://svn.apache.org/viewvc?rev=1715022&view=rev
Log:
Start using the bucket pump for outgoing connections. This switches some of
the easy code over to using the pump, while other parts now use the pump
state directly.
* incoming.c
(client_connected): Update caller.
* outgoing.c
(data_pending): Remove function.
(request_or_data_pending): Update caller.
(serf__conn_update_pollset): Use serf_pump__data_pending().
(serf__connection_pre_cleanup): Use the pump for cleaning.
(do_conn_setup): Use new helper functions.
(prepare_conn_streams): Update usage.
(store_ipaddresses_in_config): Remove function.
(connect_connection): Update caller.
(serf__open_connections): Initialize pump.
(no_more_writes): Update user.
(reset_connection): Update caller.
(socket_writev): Switch argument type.
(serf__connection_flush): Use pump vars.
(handle_async_response,
read_from_connection): Update caller.
(serf_connection_create): Simplify init.
* protocols/fcgi_protocol.c
(fcgi_outgoing_read,
serf__fcgi_protocol_init): Update init.
* protocols/http2_protocol.c
(serf__http2_protocol_init,
http2_outgoing_read): Update init.
* pump.c
(serf_pump__init): Init pool.
(serf_pump__done): New function.
(serf_pump__complete_setup): Add stream argument.
* serf_private.h
(serf_pump_t): Fix name. Add pool.
(serf_connection_t): Add pump. Remove old vars.
(serf_pump__done): New function.
(serf_pump__complete_setup): New function.
* ssltunnel.c
(handle_response,
detect_eof,
serf__ssltunnel_connect): Update usages.
Modified:
serf/trunk/incoming.c
serf/trunk/outgoing.c
serf/trunk/protocols/fcgi_protocol.c
serf/trunk/protocols/http2_protocol.c
serf/trunk/pump.c
serf/trunk/serf_private.h
serf/trunk/ssltunnel.c
Modified: serf/trunk/incoming.c
URL:
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Wed Nov 18 16:56:09 2015
@@ -32,6 +32,7 @@ static apr_status_t client_connected(ser
{
/* serf_context_t *ctx = client->ctx; */
apr_status_t status;
+ serf_bucket_t *stream;
serf_bucket_t *ostream;
serf_pump__store_ipaddresses_in_config(&client->pump);
@@ -46,17 +47,17 @@ static apr_status_t client_connected(ser
ostream = client->pump.ostream_tail;
status = client->setup(client->skt,
- &client->pump.stream,
+ &stream,
&ostream,
client->setup_baton, client->pool);
if (status) {
- serf_pump__complete_setup(&client->pump, NULL);
+ serf_pump__complete_setup(&client->pump, NULL, NULL);
/* ### Cleanup! (serf__connection_pre_cleanup) */
return status;
}
- serf_pump__complete_setup(&client->pump, ostream);
+ serf_pump__complete_setup(&client->pump, stream, ostream);
if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) {
client->proto_peek_bkt = serf_bucket_aggregate_create(
Modified: serf/trunk/outgoing.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Wed Nov 18 16:56:09 2015
@@ -67,36 +67,6 @@ static apr_status_t clean_conn(void *dat
return APR_SUCCESS;
}
-/* Safely check if there is still data pending on the connection, carefull
- to not accidentally make it invalid. */
-static int
-data_pending(serf_connection_t *conn)
-{
- if (conn->vec_len > 0)
- return TRUE; /* We can't poll right now! */
-
- if (conn->ostream_head) {
- const char *dummy;
- apr_size_t len;
- apr_status_t status;
-
- status = serf_bucket_peek(conn->ostream_head, &dummy,
- &len);
- if (!SERF_BUCKET_READ_ERROR(status)) {
- if (len > 0) {
- serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
- "Extra data to be written after sending complete "
- "requests.\n");
- return TRUE;
- }
- }
- else
- return TRUE; /* Sure, we have data (an error) */
- }
-
- return FALSE;
-}
-
static int
request_pending(serf_request_t **next_req, serf_connection_t *conn)
{
@@ -136,7 +106,7 @@ request_or_data_pending(serf_request_t *
if (request_pending(next_req, conn))
return TRUE;
- return data_pending(conn);
+ return serf_pump__data_pending(&conn->pump);
}
/* Update the pollset for this connection. We tweak the pollset based on
@@ -181,41 +151,8 @@ apr_status_t serf__conn_update_pollset(s
But it also has the nice side effect of removing references
from the aggregate to requests that are done.
*/
- if (conn->vec_len) {
- /* We still have vecs in the connection, which lifetime is
- managed by buckets inside conn->ostream_head.
-
- Don't touch ostream as that might destroy the vecs */
-
- data_waiting = (conn->state != SERF_CONN_CLOSING);
- }
- else {
- serf_bucket_t *ostream;
-
- ostream = conn->ostream_head;
-
- if (!ostream)
- ostream = conn->ssltunnel_ostream;
-
- if (ostream) {
- const char *dummy_data;
- apr_size_t len;
-
- status = serf_bucket_peek(ostream, &dummy_data, &len);
-
- if (SERF_BUCKET_READ_ERROR(status) || len > 0) {
- /* DATA or error waiting */
- data_waiting = TRUE; /* Error waiting */
- }
- else if (! status || APR_STATUS_IS_EOF(status)) {
- data_waiting = FALSE;
- }
- else
- data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */
- }
- else
- data_waiting = FALSE;
- }
+ data_waiting = serf_pump__data_pending(&conn->pump)
+ && (conn->state != SERF_CONN_CLOSING);
if (data_waiting) {
desc.reqevents |= APR_POLLOUT;
@@ -229,7 +166,7 @@ apr_status_t serf__conn_update_pollset(s
desc.reqevents |= APR_POLLIN;
/* Don't write if OpenSSL told us that it needs to read data first. */
- if (! conn->stop_writing && !data_waiting) {
+ if (! conn->pump.stop_writing && !data_waiting) {
/* This check is duplicated in write_to_connection() */
if ((conn->probable_keepalive_limit &&
@@ -288,16 +225,10 @@ static void check_buckets_drained(serf_c
void serf__connection_pre_cleanup(serf_connection_t *conn)
{
serf_request_t *rq;
- conn->vec_len = 0;
+ conn->pump.vec_len = 0;
+
+ serf_pump__done(&conn->pump);
- if (conn->ostream_head != NULL) {
-#ifdef SERF_DEBUG_BUCKET_USE
- serf__bucket_drain(conn->ostream_head);
-#endif
- serf_bucket_destroy(conn->ostream_head);
- conn->ostream_head = NULL;
- conn->ostream_tail = NULL;
- }
if (conn->ssltunnel_ostream != NULL) {
serf_bucket_destroy(conn->ssltunnel_ostream);
conn->ssltunnel_ostream = NULL;
@@ -324,58 +255,32 @@ void serf__connection_pre_cleanup(serf_c
conn->done_reqs = conn->done_reqs_tail = NULL;
}
-static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
-{
- serf_connection_t *conn = baton;
- conn->hit_eof = 1;
- return APR_EAGAIN;
-}
-
static apr_status_t do_conn_setup(serf_connection_t *conn)
{
apr_status_t status;
- serf_bucket_t *ostream;
+ serf_bucket_t *ostream, *stream;
/* ### dunno what the hell this is about. this latency stuff got
### added, and who knows whether it should stay... */
conn->latency = apr_time_now() - conn->connect_time;
- if (conn->ostream_head == NULL) {
- conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
- }
+ serf_pump__prepare_setup(&conn->pump);
- if (conn->ostream_tail == NULL) {
- conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator);
-
- serf_bucket_aggregate_hold_open(conn->ostream_tail, detect_eof, conn);
- }
-
- ostream = conn->ostream_tail;
+ ostream = conn->pump.ostream_tail;
status = (*conn->setup)(conn->skt,
- &conn->stream,
+ &stream,
&ostream,
conn->setup_baton,
conn->pool);
if (status) {
/* extra destroy here since it wasn't added to the head bucket yet. */
- serf_bucket_destroy(conn->ostream_tail);
+ serf_pump__complete_setup(&conn->pump, NULL, NULL);
serf__connection_pre_cleanup(conn);
return status;
}
- /* Share the configuration with all the buckets in the newly created output
- chain (see PLAIN or ENCRYPTED scenario's), including the request buckets
- created by the application (ostream_tail will handle this for us). */
- serf_bucket_set_config(conn->ostream_head, conn->config);
-
- /* Share the configuration with the ssl_decrypt and socket buckets. The
- response buckets wrapping the ssl_decrypt/socket buckets won't get the
- config automatically because they are upstream. */
- serf_bucket_set_config(conn->stream, conn->config);
-
- serf_bucket_aggregate_append(conn->ostream_head,
- ostream);
+ serf_pump__complete_setup(&conn->pump, stream, ostream);
/* We typically have one of two scenarios, based on whether the
application decided to encrypt this connection:
@@ -419,22 +324,23 @@ static apr_status_t prepare_conn_streams
/* If the connection does not have an associated bucket, then
* call the setup callback to get one.
*/
- if (conn->stream == NULL) {
+ if (conn->pump.stream == NULL) {
status = do_conn_setup(conn);
if (status) {
return status;
}
}
- *ostreamt = conn->ostream_tail;
- *ostreamh = conn->ostream_head;
+ *ostreamt = conn->pump.ostream_tail;
+ *ostreamh = conn->pump.ostream_head;
} else if (conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
/* SSL tunnel needed and not set up yet, get a direct unencrypted
stream for this socket */
- if (conn->stream == NULL) {
- conn->stream = serf_context_bucket_socket_create(conn->ctx,
- conn->skt,
- conn->allocator);
+ if (conn->pump.stream == NULL) {
+ conn->pump.stream =
+ serf_context_bucket_socket_create(conn->ctx,
+ conn->skt,
+ conn->allocator);
}
/* Don't create the ostream bucket chain including the ssl_encrypt
@@ -444,38 +350,19 @@ static apr_status_t prepare_conn_streams
} else {
/* SERF_CONN_CLOSING or SERF_CONN_INIT */
- *ostreamt = conn->ostream_tail;
- *ostreamh = conn->ostream_head;
+ *ostreamt = conn->pump.ostream_tail;
+ *ostreamh = conn->pump.ostream_head;
}
return APR_SUCCESS;
}
-static void store_ipaddresses_in_config(serf_config_t *config,
- apr_socket_t *skt)
-{
- apr_sockaddr_t *sa;
-
- if (apr_socket_addr_get(&sa, APR_LOCAL, skt) == APR_SUCCESS) {
- char buf[48];
- if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
- serf_config_set_stringf(config, SERF_CONFIG_CONN_LOCALIP,
- "%s:%d", buf, sa->port);
- }
- if (apr_socket_addr_get(&sa, APR_REMOTE, skt) == APR_SUCCESS) {
- char buf[48];
- if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
- serf_config_set_stringf(config, SERF_CONFIG_CONN_REMOTEIP,
- "%s:%d", buf, sa->port);
- }
-}
-
static apr_status_t connect_connection(serf_connection_t *conn)
{
serf_context_t *ctx = conn->ctx;
apr_status_t status;
- store_ipaddresses_in_config(conn->config, conn->skt);
+ serf_pump__store_ipaddresses_in_config(&conn->pump);
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
"socket for conn 0x%x connected\n", conn);
@@ -581,6 +468,9 @@ apr_status_t serf__open_connections(serf
if (status)
return status;
+ serf_pump__init(&conn->pump, &conn->io, skt, conn->config,
+ conn->allocator, conn->pool);
+
/* Flag our pollset as dirty now that we have a new socket. */
serf_io__set_pollset_dirty(&conn->io);
@@ -603,7 +493,7 @@ static apr_status_t no_more_writes(serf_
"stop writing on conn 0x%x\n", conn);
/* Clear our iovec. */
- conn->vec_len = 0;
+ conn->pump.vec_len = 0;
/* Update the pollset to know we don't want to write on this socket any
* more.
@@ -717,22 +607,22 @@ static apr_status_t reset_connection(ser
}
}
- if (conn->stream != NULL) {
- serf_bucket_destroy(conn->stream);
- conn->stream = NULL;
+ if (conn->pump.stream != NULL) {
+ serf_bucket_destroy(conn->pump.stream);
+ conn->pump.stream = NULL;
}
/* Don't try to resume any writes */
- conn->vec_len = 0;
+ conn->pump.vec_len = 0;
serf_io__set_pollset_dirty(&conn->io);
conn->state = SERF_CONN_INIT;
- conn->hit_eof = 0;
+ conn->pump.hit_eof = 0;
conn->connect_time = 0;
conn->latency = -1;
- conn->stop_writing = 0;
- conn->write_now = 0;
+ conn->pump.stop_writing = false;
+ conn->write_now = false;
/* conn->pipelining */
conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1;
@@ -760,7 +650,7 @@ static apr_status_t reset_connection(ser
return APR_SUCCESS;
}
-static apr_status_t socket_writev(serf_connection_t *conn)
+static apr_status_t socket_writev(serf_pump_t *conn)
{
apr_size_t written;
apr_status_t status;
@@ -805,7 +695,7 @@ static apr_status_t socket_writev(serf_c
serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n");
/* Log progress information */
- serf__context_progress_delta(conn->ctx, 0, written);
+ serf__context_progress_delta(conn->io->ctx, 0, written);
}
return status;
@@ -818,14 +708,14 @@ apr_status_t serf__connection_flush(serf
apr_status_t read_status = APR_SUCCESS;
serf_bucket_t *ostreamh = NULL;
- conn->hit_eof = FALSE;
+ conn->pump.hit_eof = FALSE;
while (status == APR_SUCCESS) {
/* First try to write out what is already stored in the
connection vecs. */
- while (conn->vec_len && !status) {
- status = socket_writev(conn);
+ while (conn->pump.vec_len && !status) {
+ status = socket_writev(&conn->pump);
/* If the write would have blocked, then we're done.
* Don't try to write anything else to the socket.
@@ -838,7 +728,7 @@ apr_status_t serf__connection_flush(serf
if (status || !pump)
return status;
- else if (read_status || conn->vec_len || conn->hit_eof)
+ else if (read_status || conn->pump.vec_len || conn->pump.hit_eof)
return read_status;
/* Ok, with the vecs written, we can now refill the per connection
@@ -856,12 +746,12 @@ apr_status_t serf__connection_flush(serf
data as available, we probably don't want to read ALL_AVAIL, but
a lower number, like the size of one or a few TCP packets, the
available TCP buffer size ... */
- conn->hit_eof = 0;
+ conn->pump.hit_eof = 0;
read_status = serf_bucket_read_iovec(ostreamh,
SERF_READ_ALL_AVAIL,
IOV_MAX,
- conn->vec,
- &conn->vec_len);
+ conn->pump.vec,
+ &conn->pump.vec_len);
if (read_status == SERF_ERROR_WAIT_CONN) {
/* The bucket told us that it can't provide more data until
@@ -873,7 +763,7 @@ apr_status_t serf__connection_flush(serf
we can actually write something. otherwise, we could
end up in a CPU spin: socket wants something, but we
don't have anything (and keep returning EAGAIN) */
- conn->stop_writing = 1;
+ conn->pump.stop_writing = 1;
serf_io__set_pollset_dirty(&conn->io);
read_status = APR_EAGAIN;
@@ -881,7 +771,7 @@ apr_status_t serf__connection_flush(serf
else if (APR_STATUS_IS_EAGAIN(read_status)) {
/* We read some stuff, but did we read everything ? */
- if (conn->hit_eof)
+ if (conn->pump.hit_eof)
read_status = APR_SUCCESS;
}
else if (SERF_BUCKET_READ_ERROR(read_status)) {
@@ -1087,7 +977,7 @@ static apr_status_t handle_async_respons
if (conn->current_async_response == NULL) {
conn->current_async_response =
- (*conn->async_acceptor)(NULL, conn->stream,
+ (*conn->async_acceptor)(NULL, conn->pump.stream,
conn->async_acceptor_baton, pool);
}
@@ -1121,8 +1011,8 @@ static apr_status_t read_from_connection
/* If the stop_writing flag was set on the connection, reset it now because
there is some data to read. */
- if (conn->stop_writing) {
- conn->stop_writing = 0;
+ if (conn->pump.stop_writing) {
+ conn->pump.stop_writing = false;
serf_io__set_pollset_dirty(&conn->io);
}
@@ -1182,7 +1072,7 @@ static apr_status_t read_from_connection
const char *data;
apr_size_t len;
- status = serf_bucket_peek(conn->stream, &data, &len);
+ status = serf_bucket_peek(conn->pump.stream, &data, &len);
if (APR_STATUS_IS_EOF(status)) {
reset_connection(conn, 1);
@@ -1199,7 +1089,7 @@ static apr_status_t read_from_connection
/* Unexpected response from the server */
if (conn->write_now) {
- conn->write_now = 0;
+ conn->write_now = false;
status = conn->perform_write(conn);
if (!SERF_BUCKET_READ_ERROR(status))
@@ -1220,7 +1110,7 @@ static apr_status_t read_from_connection
return SERF_ERROR_BAD_HTTP_RESPONSE;
}
- request->resp_bkt = (*request->acceptor)(request, conn->stream,
+ request->resp_bkt = (*request->acceptor)(request,
conn->pump.stream,
request->acceptor_baton,
tmppool);
apr_pool_clear(tmppool);
@@ -1554,19 +1444,14 @@ serf_connection_t *serf_connection_creat
conn->closed_baton = closed_baton;
conn->pool = pool;
conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
- conn->stream = NULL;
- conn->ostream_head = NULL;
- conn->ostream_tail = NULL;
conn->io.type = SERF_IO_CONN;
conn->io.u.conn = conn;
conn->io.ctx = ctx;
conn->io.dirty_conn = false;
conn->io.reqevents = 0;
- conn->hit_eof = 0;
conn->state = SERF_CONN_INIT;
conn->latency = -1; /* unknown */
- conn->stop_writing = 0;
- conn->write_now = 0;
+ conn->write_now = false;
conn->wait_for_connect = 0;
conn->pipelining = 1;
conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1;
@@ -1698,9 +1583,9 @@ apr_status_t serf_connection_close(
handle_conn_closed(conn, status);
}
}
- if (conn->stream != NULL) {
- serf_bucket_destroy(conn->stream);
- conn->stream = NULL;
+ if (conn->pump.stream != NULL) {
+ serf_bucket_destroy(conn->pump.stream);
+ conn->pump.stream = NULL;
}
if (conn->protocol_baton) {
@@ -1782,8 +1667,8 @@ void serf_connection_set_framing_type(
if (conn->skt) {
serf_io__set_pollset_dirty(&conn->io);
- conn->stop_writing = 0;
- conn->write_now = 1;
+ conn->pump.stop_writing = 0;
+ conn->write_now = true;
/* Close down existing protocol */
if (conn->protocol_baton) {
Modified: serf/trunk/protocols/fcgi_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 16:56:09 2015
@@ -500,7 +500,7 @@ static apr_status_t fcgi_outgoing_read(s
serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
if (!fcgi->stream)
- fcgi->stream = conn->stream;
+ fcgi->stream = conn->pump.stream;
return fcgi_read(fcgi);
}
@@ -537,8 +537,8 @@ void serf__fcgi_protocol_init(serf_conne
fcgi->pool = protocol_pool;
fcgi->conn = conn;
fcgi->io = &conn->io;
- fcgi->stream = conn->stream;
- fcgi->ostream = conn->ostream_tail;
+ fcgi->stream = conn->pump.stream;
+ fcgi->ostream = conn->pump.ostream_tail;
fcgi->allocator = conn->allocator;
fcgi->config = conn->config;
Modified: serf/trunk/protocols/http2_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 16:56:09 2015
@@ -239,8 +239,8 @@ void serf__http2_protocol_init(serf_conn
h2->pool = protocol_pool;
h2->conn = conn;
h2->io = &conn->io;
- h2->stream = conn->stream;
- h2->ostream = conn->ostream_tail;
+ h2->stream = conn->pump.stream;
+ h2->ostream = conn->pump.ostream_tail;
h2->allocator = conn->allocator;
h2->config = conn->config;
@@ -1644,14 +1644,14 @@ http2_outgoing_read(serf_connection_t *c
/* If the stop_writing flag was set on the connection, reset it now because
there is some data to read. */
- if (conn->stop_writing)
+ if (conn->pump.stop_writing)
{
- conn->stop_writing = 0;
+ conn->pump.stop_writing = false;
serf_io__set_pollset_dirty(&conn->io);
}
if (h2->stream == NULL)
- h2->stream = conn->stream;
+ h2->stream = conn->pump.stream;
status = http2_process(h2);
Modified: serf/trunk/pump.c
URL:
http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/pump.c (original)
+++ serf/trunk/pump.c Wed Nov 18 16:56:09 2015
@@ -58,11 +58,37 @@ void serf_pump__init(serf_pump_t *pump,
pump->allocator = allocator;
pump->config = config;
pump->skt = skt;
+ pump->pool = pool;
apr_pool_cleanup_register(pool, pump, pump_cleanup,
apr_pool_cleanup_null);
}
+void serf_pump__done(serf_pump_t *pump)
+{
+ if (pump->pool) {
+ apr_pool_cleanup_run(pump->pool, pump, pump_cleanup);
+ }
+
+ pump->io = NULL;
+ pump->allocator = NULL;
+ pump->config = NULL;
+
+ /* pump->stream is managed by the current reader! */
+
+ pump->ostream_head = NULL;
+ pump->ostream_tail = NULL;
+
+ pump->skt = NULL;
+ pump->vec_len = 0;
+
+ pump->done_writing = false;
+ pump->stop_writing = false;
+ pump->hit_eof = false;
+
+ pump->pool = NULL;
+}
+
/* Safely check if there is still data pending on the connection, carefull
to not accidentally make it invalid. */
bool serf_pump__data_pending(serf_pump_t *pump)
@@ -118,8 +144,10 @@ void serf_pump__prepare_setup(serf_pump_
}
void serf_pump__complete_setup(serf_pump_t *pump,
+ serf_bucket_t *stream,
serf_bucket_t *ostream)
{
+ pump->stream = stream;
if (ostream)
serf_bucket_aggregate_append(pump->ostream_head, ostream);
else
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 16:56:09 2015
@@ -147,14 +147,18 @@ typedef struct serf_io_baton_t {
} serf_io_baton_t;
-typedef struct serf_pump_io_t
+typedef struct serf_pump_t
{
serf_io_baton_t *io;
serf_bucket_alloc_t *allocator;
serf_config_t *config;
+ /* The incoming stream. Stored here for easy access by users,
+ but not managed as part of the pump */
serf_bucket_t *stream;
+
+ /* The outgoing stream */
serf_bucket_t *ostream_head;
serf_bucket_t *ostream_tail;
@@ -171,6 +175,8 @@ typedef struct serf_pump_io_t
/* Set to true when ostream_tail was read to EOF */
bool hit_eof;
+
+ apr_pool_t *pool;
} serf_pump_t;
@@ -442,7 +448,6 @@ struct serf_incoming_t {
void(*perform_teardown)(serf_incoming_t *conn);
void *protocol_baton;
-
serf_config_t *config;
serf_bucket_t *proto_peek_bkt;
@@ -464,6 +469,7 @@ struct serf_connection_t {
apr_status_t status;
serf_io_baton_t io;
+ serf_pump_t pump;
apr_pool_t *pool;
serf_bucket_alloc_t *allocator;
@@ -496,14 +502,6 @@ struct serf_connection_t {
serf_response_handler_t async_handler;
void *async_handler_baton;
- /* A bucket wrapped around our socket (for reading responses). */
- serf_bucket_t *stream;
- /* A reference to the aggregate bucket that provides the boundary between
- * request level buckets and connection level buckets.
- */
- serf_bucket_t *ostream_head;
- serf_bucket_t *ostream_tail;
-
/* Aggregate bucket used to send the CONNECT request. */
serf_bucket_t *ssltunnel_ostream;
@@ -524,9 +522,6 @@ struct serf_connection_t {
serf_request_t *done_reqs;
serf_request_t *done_reqs_tail;
- struct iovec vec[IOV_MAX];
- int vec_len;
-
serf_connection_setup_t setup;
void *setup_baton;
serf_connection_closed_t closed;
@@ -542,8 +537,6 @@ struct serf_connection_t {
only. */
int pipelining;
- int hit_eof;
-
/* Host url, path ommitted, syntax: https://svn.apache.org . */
const char *host_url;
@@ -560,11 +553,8 @@ struct serf_connection_t {
/* Calculated connection latency. Negative value if latency is unknown. */
apr_interval_time_t latency;
- /* Needs to read first before we can write again. */
- int stop_writing;
-
/* Write out information now */
- int write_now;
+ bool write_now;
/* Wait for connect: connect() returned APR_EINPROGRESS.
Socket not usable yet */
@@ -769,6 +759,8 @@ void serf_pump__init(serf_pump_t *pump,
serf_bucket_alloc_t *allocator,
apr_pool_t *pool);
+void serf_pump__done(serf_pump_t *pump);
+
bool serf_pump__data_pending(serf_pump_t *pump);
void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump);
@@ -777,7 +769,9 @@ apr_status_t serf_pump__write(serf_pump_
/* These must always be called as a pair to avoid a memory leak */
void serf_pump__prepare_setup(serf_pump_t *pump);
-void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream);
+void serf_pump__complete_setup(serf_pump_t *pump,
+ serf_bucket_t *stream,
+ serf_bucket_t *ostream);
/** Logging functions. **/
Modified: serf/trunk/ssltunnel.c
URL:
http://svn.apache.org/viewvc/serf/trunk/ssltunnel.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/ssltunnel.c (original)
+++ serf/trunk/ssltunnel.c Wed Nov 18 16:56:09 2015
@@ -111,8 +111,8 @@ static apr_status_t handle_response(serf
apr_pool_destroy(ctx->pool);
serf_bucket_destroy(conn->ssltunnel_ostream);
conn->ssltunnel_ostream = NULL;
- serf_bucket_destroy(conn->stream);
- conn->stream = NULL;
+ serf_bucket_destroy(conn->pump.stream);
+ conn->pump.stream = NULL;
ctx = NULL;
serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, conn->config,
@@ -173,7 +173,7 @@ static apr_status_t setup_request(serf_r
static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
{
serf_connection_t *conn = baton;
- conn->hit_eof = 1;
+ conn->pump.hit_eof = true;
return APR_EAGAIN;
}
@@ -182,6 +182,7 @@ apr_status_t serf__ssltunnel_connect(ser
{
req_ctx_t *ctx;
apr_pool_t *ssltunnel_pool;
+ serf_bucket_t *ssltunnel_ostream;
apr_pool_create(&ssltunnel_pool, conn->pool);
@@ -190,8 +191,10 @@ apr_status_t serf__ssltunnel_connect(ser
ctx->uri = apr_psprintf(ctx->pool, "%s:%d", conn->host_info.hostname,
conn->host_info.port);
- conn->ssltunnel_ostream = serf_bucket_aggregate_create(conn->allocator);
- serf_bucket_aggregate_hold_open(conn->ssltunnel_ostream, detect_eof, conn);
+ ssltunnel_ostream = serf_bucket_aggregate_create(conn->allocator);
+ serf_bucket_aggregate_hold_open(ssltunnel_ostream, detect_eof, conn);
+
+ conn->ssltunnel_ostream = ssltunnel_ostream;
serf__ssltunnel_request_create(conn,
setup_request,