Author: rhuijben
Date: Wed Nov 18 19:01:54 2015
New Revision: 1715036
URL: http://svn.apache.org/viewvc?rev=1715036&view=rev
Log:
Use a standard method to the pump logic to add a bucket to the to be
written data. This consolidates some code written for the http2
and fcgi protocols.
* outgoing.c
(write_to_connection): Use serf_pump__add_output.
* protocols/fcgi_protocol.c
(serf_fcgi_protocol_t): Just store pump instead of streams.
(fcgi_process): Update usage.
(serf_fcgi__enqueue_frame): Use standard code.
(fcgi_outgoing_read): Remove now unneeded code.
(serf__fcgi_protocol_init): Simplify init.
(fcgi_server_read,
serf__fcgi_protocol_init_server): Simplify.
* protocols/fcgi_protocol.h
(serf_fcgi__enqueue_frame): Tweak argument name
* protocols/http2_protocol.c
(serf_http2_protocol_t): Store pump instead of streams.
(serf__http2_protocol_init,
serf__http2_protocol_init_server): Simplify init.
(serf_http2__enqueue_frame): Wrap standard code.
(http2_process): Use stream from pump.
(http2_outgoing_read,
http2_incoming_read): Remove init.
* protocols/http2_protocol.h
(serf_http2__enqueue_frame): Tweak argument name
* pump.c
(serf_pump__write): Set pollset dirty if receiving EAGAIN,
and not already asking for writable.
(serf_pump__add_output): New function.
* serf_private.h
(serf_pump__add_output): New function.
Modified:
serf/trunk/outgoing.c
serf/trunk/protocols/fcgi_protocol.c
serf/trunk/protocols/fcgi_protocol.h
serf/trunk/protocols/http2_protocol.c
serf/trunk/protocols/http2_protocol.h
serf/trunk/pump.c
serf/trunk/serf_private.h
Modified: serf/trunk/outgoing.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Wed Nov 18 19:01:54 2015
@@ -777,8 +777,7 @@ static apr_status_t write_to_connection(
request_writing_done,
request_writing_finished,
conn->allocator);
- serf_bucket_aggregate_append(conn->pump.ostream_tail,
- event_bucket);
+ serf_pump__add_output(&conn->pump, event_bucket, false);
}
/* If we got some data, then deliver it. */
Modified: serf/trunk/protocols/fcgi_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 19:01:54 2015
@@ -40,14 +40,12 @@ typedef struct serf_fcgi_protocol_t
serf_incoming_t *client;
serf_io_baton_t *io; /* Low level connection */
+ serf_pump_t *pump;
apr_pool_t *pool;
serf_bucket_alloc_t *allocator;
serf_config_t *config;
- serf_bucket_t *stream;
- serf_bucket_t *ostream;
-
serf_fcgi_processor_t processor;
void *processor_baton;
@@ -175,7 +173,7 @@ static apr_status_t fcgi_process(serf_fc
{
SERF_FCGI_assert(!fcgi->in_frame);
- body = serf__bucket_fcgi_unframe_create(fcgi->stream,
+ body = serf__bucket_fcgi_unframe_create(fcgi->pump->stream,
fcgi->allocator);
serf__bucket_fcgi_unframe_set_eof(body,
@@ -347,52 +345,9 @@ static apr_status_t fcgi_read(serf_fcgi_
apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
serf_bucket_t *frame,
- bool pump)
+ bool flush)
{
- apr_status_t status;
- bool want_write;
-
- if (!pump && !fcgi->io->dirty_conn)
- {
- const char *data;
- apr_size_t len;
-
- /* Cheap check to see if we should request a write
- event next time around */
- status = serf_bucket_peek(fcgi->ostream, &data, &len);
-
- if (SERF_BUCKET_READ_ERROR(status))
- {
- serf_bucket_destroy(frame);
- return status;
- }
-
- if (len == 0)
- {
- serf_io__set_pollset_dirty(fcgi->io);
- }
- }
-
- serf_bucket_aggregate_append(fcgi->ostream, frame);
-
- if (!pump)
- return APR_SUCCESS;
-
- /* Flush final output buffer (after ssl, etc.) */
- if (fcgi->conn)
- status = serf__connection_flush(fcgi->conn, TRUE);
- else
- status = serf__incoming_client_flush(fcgi->client, TRUE);
-
- want_write = APR_STATUS_IS_EAGAIN(status);
-
- if ((want_write && !(fcgi->io->reqevents & APR_POLLOUT))
- || (!want_write && (fcgi->io->reqevents & APR_POLLOUT)))
- {
- serf_io__set_pollset_dirty(fcgi->io);
- }
-
- return status;
+ return serf_pump__add_output(fcgi->pump, frame, flush);
}
static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi)
@@ -499,9 +454,6 @@ static apr_status_t fcgi_outgoing_read(s
{
serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
- if (!fcgi->stream)
- fcgi->stream = conn->pump.stream;
-
return fcgi_read(fcgi);
}
@@ -537,8 +489,7 @@ void serf__fcgi_protocol_init(serf_conne
fcgi->pool = protocol_pool;
fcgi->conn = conn;
fcgi->io = &conn->io;
- fcgi->stream = conn->pump.stream;
- fcgi->ostream = conn->pump.ostream_tail;
+ fcgi->pump = &conn->pump;
fcgi->allocator = conn->allocator;
fcgi->config = conn->config;
@@ -561,11 +512,6 @@ static apr_status_t fcgi_server_read(ser
{
serf_fcgi_protocol_t *fcgi = client->protocol_baton;
- if (! fcgi->stream) {
- fcgi->stream = client->pump.stream;
- fcgi->ostream = client->pump.ostream_tail;
- }
-
return fcgi_read(fcgi);
}
@@ -573,11 +519,6 @@ static apr_status_t fcgi_server_write(se
{
serf_fcgi_protocol_t *fcgi = client->protocol_baton;
- if (!fcgi->stream) {
- fcgi->stream = client->pump.stream;
- fcgi->ostream = client->pump.ostream_tail;
- }
-
return fcgi_write(fcgi);
}
@@ -606,8 +547,7 @@ void serf__fcgi_protocol_init_server(ser
fcgi->pool = protocol_pool;
fcgi->client = client;
fcgi->io = &client->io;
- fcgi->stream = client->pump.stream;
- fcgi->ostream = client->pump.ostream_tail;
+ fcgi->pump = &client->pump;
fcgi->allocator = client->allocator;
fcgi->config = client->config;
Modified: serf/trunk/protocols/fcgi_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Wed Nov 18 19:01:54 2015
@@ -196,7 +196,7 @@ apr_status_t serf_fcgi__setup_incoming_r
apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
serf_bucket_t *frame,
- bool pump);
+ bool flush);
void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi,
serf_fcgi_stream_t *stream);
Modified: serf/trunk/protocols/http2_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 19:01:54 2015
@@ -122,8 +122,8 @@ struct serf_http2_protocol_t
serf_incoming_t *client;
serf_io_baton_t *io; /* Low level connection */
+ serf_pump_t *pump;
- serf_bucket_t *stream, *ostream;
serf_bucket_alloc_t *allocator;
serf_http2_processor_t processor;
@@ -239,8 +239,7 @@ void serf__http2_protocol_init(serf_conn
h2->pool = protocol_pool;
h2->conn = conn;
h2->io = &conn->io;
- h2->stream = conn->pump.stream;
- h2->ostream = conn->pump.ostream_tail;
+ h2->pump = &conn->pump;
h2->allocator = conn->allocator;
h2->config = conn->config;
@@ -289,7 +288,7 @@ void serf__http2_protocol_init(serf_conn
/* Send the HTTP/2 Connection Preface */
tmp = SERF_BUCKET_SIMPLE_STRING(HTTP2_CONNECTION_PREFIX, h2->allocator);
- serf_bucket_aggregate_append(h2->ostream, tmp);
+ serf_pump__add_output(h2->pump, tmp, false);
/* And now a settings frame and a huge window */
{
@@ -331,8 +330,7 @@ void serf__http2_protocol_init_server(se
h2->pool = protocol_pool;
h2->client = client;
h2->io = &client->io;
- h2->stream = client->pump.stream;
- h2->ostream = client->pump.ostream_tail;
+ h2->pump = &client->pump;
h2->allocator = client->allocator;
h2->config = client->config;
@@ -432,53 +430,9 @@ enqueue_http2_request(serf_http2_protoco
apr_status_t
serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
serf_bucket_t *frame,
- int pump)
+ bool flush)
{
- apr_status_t status;
- bool want_write;
-
-
- if (!pump && !h2->io->dirty_conn)
- {
- const char *data;
- apr_size_t len;
-
- /* Cheap check to see if we should request a write
- event next time around */
- status = serf_bucket_peek(h2->ostream, &data, &len);
-
- if (SERF_BUCKET_READ_ERROR(status))
- {
- serf_bucket_destroy(frame);
- return status;
- }
-
- if (len == 0)
- {
- serf_io__set_pollset_dirty(h2->io);
- }
- }
-
- serf_bucket_aggregate_append(h2->ostream, frame);
-
- if (!pump)
- return APR_SUCCESS;
-
- /* Flush final output buffer (after ssl, etc.) */
- if (h2->conn)
- status = serf__connection_flush(h2->conn, TRUE);
- else
- status = serf__incoming_client_flush(h2->client, TRUE);
-
- want_write = APR_STATUS_IS_EAGAIN(status);
-
- if ((want_write && !(h2->io->reqevents & APR_POLLOUT))
- || (!want_write && (h2->io->reqevents & APR_POLLOUT)))
- {
- serf_io__set_pollset_dirty(h2->io);
- }
-
- return status;
+ return serf_pump__add_output(h2->pump, frame, flush);
}
/* Implements serf_bucket_prefix_handler_t.
@@ -1072,7 +1026,7 @@ http2_process(serf_http2_protocol_t *h2)
SERF_H2_assert(!h2->in_frame);
body = serf__bucket_http2_unframe_create(
- h2->stream,
+ h2->pump->stream,
h2->rl_max_framesize,
h2->allocator);
@@ -1650,9 +1604,6 @@ http2_outgoing_read(serf_connection_t *c
serf_io__set_pollset_dirty(&conn->io);
}
- if (h2->stream == NULL)
- h2->stream = conn->pump.stream;
-
status = http2_process(h2);
if (!status)
@@ -1765,8 +1716,6 @@ http2_incoming_read(serf_incoming_t *cli
/* Peek buffer is now empty. Use actual stream */
serf_bucket_destroy(client->proto_peek_bkt);
client->proto_peek_bkt = NULL;
-
- h2->stream = client->pump.stream;
}
if (APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN)
Modified: serf/trunk/protocols/http2_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Wed Nov 18 19:01:54 2015
@@ -157,7 +157,7 @@ typedef apr_status_t (* serf_http2_proce
apr_status_t
serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
serf_bucket_t *frame,
- int pump);
+ bool flush);
/* Creates a new stream */
serf_http2_stream_t *
Modified: serf/trunk/pump.c
URL:
http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/pump.c (original)
+++ serf/trunk/pump.c Wed Nov 18 19:01:54 2015
@@ -293,8 +293,18 @@ apr_status_t serf_pump__write(serf_pump_
return no_more_writes(pump);
}
- if (status || !fetch_new)
+ if (status || !fetch_new) {
+
+ /* If we couldn't write everything that we tried,
+ make sure that we will receive a write event next time */
+ if (APR_STATUS_IS_EAGAIN(status)
+ && !pump->io->dirty_conn
+ && !(pump->io->reqevents & APR_POLLOUT))
+ {
+ serf_io__set_pollset_dirty(pump->io);
+ }
return status;
+ }
else if (read_status || pump->vec_len || pump->hit_eof)
return read_status;
@@ -340,3 +350,31 @@ apr_status_t serf_pump__write(serf_pump_
return status;
}
+
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+ serf_bucket_t *bucket,
+ bool flush)
+{
+ if (!flush
+ && !pump->io->dirty_conn
+ && !pump->stop_writing
+ && !(pump->io->reqevents & APR_POLLOUT)
+ && !serf_pump__data_pending(pump))
+ {
+ /* If not writing now,
+ * and not already dirty
+ * and nothing pending yet
+ Then mark the pollset dirty to trigger a write */
+
+ serf_io__set_pollset_dirty(pump->io);
+ }
+
+ serf_bucket_aggregate_append(pump->ostream_tail, bucket);
+
+ if (!flush)
+ return APR_SUCCESS;
+
+ /* Flush final output buffer (after ssl, etc.) */
+ return serf_pump__write(pump, TRUE);
+}
+
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715036&r1=1715035&r2=1715036&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 19:01:54 2015
@@ -762,6 +762,10 @@ void serf_pump__store_ipaddresses_in_con
apr_status_t serf_pump__write(serf_pump_t *pump,
bool fetch_new);
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+ serf_bucket_t *bucket,
+ bool flush);
+
/* These must always be called as a pair to avoid a memory leak */
void serf_pump__prepare_setup(serf_pump_t *pump);
void serf_pump__complete_setup(serf_pump_t *pump,