Author: rhuijben
Date: Wed Nov 18 10:42:45 2015
New Revision: 1714965
URL: http://svn.apache.org/viewvc?rev=1714965&view=rev
Log:
Improve cleanup on incoming fcgi connections to allow using the same
connection for multiple requests with mod_proxy_fcgi.
* config_store.c
(serf__config_store_remove_connection): Add comment.
(serf__config_store_remove_client): New function.
(serf__config_store_remove_host): Add comment.
* incoming.c
(destroy_request): Rename to...
(serf__incoming_request_destroy): ... this.
(response_finished): Update caller.
(read_from_client): Update caller. Don't remove listener from
context here, but delay that to serf__incoming_update_pollset()
where we can safely destroy the pool.
(serf_incoming_create2): Remove init of variable.
(serf__incoming_update_pollset): Destroy listener data when done.
* protocols/fcgi_protocol.c
(fcgi_process): Update caller.
(move_to_head): Remove function.
(serf_fcgi__stream_get): Remove argument. (Given that clients typically don't
use multiplexing... it is not worth optimizing for)
(serf_fcgi__close_stream): New function.
* protocols/fcgi_protocol.h
(serf_fcgi__stream_get): New function.
(serf_fcgi__close_stream): New function.
(serf_fcgi__stream_destroy): New function.
* protocols/fcgi_stream.c
(serf_fcgi_stream_data_t): Add serf_fcgi_stream_t to have everyting in
one allocation.
(serf_fcgi__stream_create): Update init.
(serf_fcgi__stream_destroy): New function.
(close_stream): New function.
(fcgi_stream_enqueue_response): Hook close_stream on destroying the done
record.
* serf_private.h
Modified:
serf/trunk/config_store.c
serf/trunk/incoming.c
serf/trunk/protocols/fcgi_protocol.c
serf/trunk/protocols/fcgi_protocol.h
serf/trunk/protocols/fcgi_stream.c
serf/trunk/serf_private.h
Modified: serf/trunk/config_store.c
URL:
http://svn.apache.org/viewvc/serf/trunk/config_store.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/config_store.c (original)
+++ serf/trunk/config_store.c Wed Nov 18 10:42:45 2015
@@ -225,14 +225,22 @@ apr_status_t
serf__config_store_remove_connection(serf__config_store_t config_store,
serf_connection_t *conn)
{
- return APR_ENOTIMPL;
+ return APR_ENOTIMPL; /* Mem leak? */
}
apr_status_t
+serf__config_store_remove_client(serf__config_store_t config_store,
+ serf_incoming_t *client)
+{
+ return APR_ENOTIMPL; /* Mem leak? */
+}
+
+
+apr_status_t
serf__config_store_remove_host(serf__config_store_t config_store,
const char *hostname_port)
{
- return APR_ENOTIMPL;
+ return APR_ENOTIMPL; /* Mem leak? */
}
/*** Config ***/
Modified: serf/trunk/incoming.c
URL:
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Wed Nov 18 10:42:45 2015
@@ -113,14 +113,12 @@ static apr_status_t client_connected(ser
}
/* Destroy an incoming request and its resources */
-static apr_status_t destroy_request(serf_incoming_request_t *request)
+void serf__incoming_request_destroy(serf_incoming_request_t *request)
{
serf_incoming_t *incoming = request->incoming;
apr_pool_destroy(request->pool);
serf_bucket_mem_free(incoming->allocator, request);
-
- return APR_SUCCESS;
}
/* Called when the response is completely written and the write bucket
@@ -129,14 +127,13 @@ static apr_status_t response_finished(vo
apr_uint64_t bytes_written)
{
serf_incoming_request_t *request = baton;
- apr_status_t status = APR_SUCCESS;
request->response_finished = true;
if (request->request_read && request->response_finished) {
- status = destroy_request(request);
+ serf__incoming_request_destroy(request);
}
- return status;
+ return APR_SUCCESS;
}
static apr_status_t http1_enqueue_reponse(serf_incoming_request_t *request,
@@ -351,7 +348,7 @@ static apr_status_t read_from_client(ser
client->current_request = NULL;
if (rq->request_read && rq->response_finished) {
- status = destroy_request(rq);
+ serf__incoming_request_destroy(rq);
}
/* Is the connection at eof or just the request? */
@@ -369,7 +366,6 @@ static apr_status_t read_from_client(ser
{
apr_pollfd_t tdesc = { 0 };
- int i;
/* Remove us from the pollset */
tdesc.desc_type = APR_POLL_SOCKET;
@@ -378,17 +374,14 @@ static apr_status_t read_from_client(ser
client->ctx->pollset_rm(client->ctx->pollset_baton,
&tdesc, &client->baton);
- /* And from the incommings list */
- for (i = 0; i < client->ctx->incomings->nelts; i++) {
- if (GET_INCOMING(client->ctx, i) == client) {
- GET_INCOMING(client->ctx, i)
- = GET_INCOMING(client->ctx,
- client->ctx->incomings->nelts - 1);
- break;
- }
- }
- client->ctx->incomings->nelts--;
client->seen_in_pollset |= APR_POLLHUP; /* No more events */
+
+ /* Note that the client is done. The pool containing skt
+ and this listener will now be cleared from the context
+ handlers dirty pollset support */
+ client->skt = NULL;
+ client->dirty_conn = true;
+ client->ctx->dirty_pollset = true;
}
status = client->closed(client, client->closed_baton, status,
@@ -760,7 +753,6 @@ apr_status_t serf_incoming_create2(
ic->stream = NULL;
ic->ostream_head = NULL;
ic->ostream_tail = NULL;
- ic->ssltunnel_ostream = NULL;
ic->protocol_baton = NULL;
ic->perform_read = read_from_client;
@@ -872,24 +864,52 @@ apr_status_t serf_listener_create(
return APR_SUCCESS;
}
-apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming)
+apr_status_t serf__incoming_update_pollset(serf_incoming_t *client)
{
- serf_context_t *ctx = incoming->ctx;
+ serf_context_t *ctx = client->ctx;
apr_status_t status;
apr_pollfd_t desc = { 0 };
bool data_waiting;
- if (!incoming->skt) {
+ if (!client->skt) {
+ int cid;
+ /* We are in the proces of being cleaned up. As we are not
+ in the event loop and already notified the close callback
+ we can now clear our pool and remove us from the context */
+
+ if (client->config)
+ serf__config_store_remove_client(ctx->config_store, client);
+
+ /* And from the incommings list */
+ for (cid = 0; cid < ctx->incomings->nelts; cid++) {
+ if (GET_INCOMING(ctx, cid) == client) {
+ GET_INCOMING(ctx, cid) =
+ GET_INCOMING(ctx,
+ ctx->incomings->nelts - 1);
+ break;
+ }
+ }
+ client->ctx->incomings->nelts--;
+
+ apr_pool_destroy(client->pool);
+
+ if (cid >= ctx->incomings->nelts) {
+ /* We skipped updating the pollset on this item as we moved it.
+ Let's run it now */
+
+ return serf__incoming_update_pollset(GET_INCOMING(ctx, cid));
+ }
+
return APR_SUCCESS;
}
/* Remove the socket from the poll set. */
desc.desc_type = APR_POLL_SOCKET;
- desc.desc.s = incoming->skt;
- desc.reqevents = incoming->reqevents;
+ desc.desc.s = client->skt;
+ desc.reqevents = client->reqevents;
status = ctx->pollset_rm(ctx->pollset_baton,
- &desc, &incoming->baton);
+ &desc, &client->baton);
if (status && !APR_STATUS_IS_NOTFOUND(status))
return status;
@@ -897,7 +917,7 @@ apr_status_t serf__incoming_update_polls
desc.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
/* If we are not connected yet, we just want to know when we are */
- if (incoming->wait_for_connect) {
+ if (client->wait_for_connect) {
data_waiting = true;
desc.reqevents |= APR_POLLOUT;
}
@@ -909,7 +929,7 @@ apr_status_t serf__incoming_update_polls
But it also has the nice side effect of removing references
from the aggregate to requests that are done.
*/
- if (incoming->vec_len) {
+ if (client->vec_len) {
/* We still have vecs in the connection, which lifetime is
managed by buckets inside client->ostream_head.
@@ -920,10 +940,7 @@ apr_status_t serf__incoming_update_polls
else {
serf_bucket_t *ostream;
- ostream = incoming->ostream_head;
-
- if (!ostream)
- ostream = incoming->ssltunnel_ostream;
+ ostream = client->ostream_head;
if (ostream) {
const char *dummy_data;
@@ -951,11 +968,11 @@ apr_status_t serf__incoming_update_polls
}
/* save our reqevents, so we can pass it in to remove later. */
- incoming->reqevents = desc.reqevents;
+ client->reqevents = desc.reqevents;
/* Note: even if we don't want to read/write this socket, we still
* want to poll it for hangups and errors.
*/
return ctx->pollset_add(ctx->pollset_baton,
- &desc, &incoming->baton);
+ &desc, &client->baton);
}
Modified: serf/trunk/protocols/fcgi_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 10:42:45 2015
@@ -226,13 +226,13 @@ static apr_status_t fcgi_process(serf_fc
switch (frametype)
{
case FCGI_FRAMETYPE(FCGI_V1, FCGI_BEGIN_REQUEST):
- stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+ stream = serf_fcgi__stream_get(fcgi, sid, false);
if (stream) {
/* Stream must be new */
return SERF_ERROR_FCGI_PROTOCOL_ERROR;
}
- stream = serf_fcgi__stream_get(fcgi, sid, TRUE, TRUE);
+ stream = serf_fcgi__stream_get(fcgi, sid, true);
remaining = (apr_size_t)serf_bucket_get_remaining(body);
if (remaining != sizeof(FCGI_BeginRequestBody)) {
@@ -254,7 +254,7 @@ static apr_status_t fcgi_process(serf_fc
process_bucket = body;
break;
case FCGI_FRAMETYPE(FCGI_V1, FCGI_PARAMS):
- stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+ stream = serf_fcgi__stream_get(fcgi, sid, false);
if (!stream) {
return SERF_ERROR_FCGI_PROTOCOL_ERROR;
}
@@ -275,7 +275,7 @@ static apr_status_t fcgi_process(serf_fc
}
break;
case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDIN):
- stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+ stream = serf_fcgi__stream_get(fcgi, sid, false);
if (!stream) {
return SERF_ERROR_FCGI_PROTOCOL_ERROR;
}
@@ -430,17 +430,10 @@ static apr_status_t fcgi_teardown(serf_f
return APR_ENOTIMPL;
}
-static void
-move_to_head(serf_fcgi_stream_t *stream)
-{
- /* Not implemented yet */
-}
-
serf_fcgi_stream_t *
serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
apr_uint16_t streamid,
- bool create,
- bool move_first)
+ bool create)
{
serf_fcgi_stream_t *stream;
@@ -450,12 +443,7 @@ serf_fcgi__stream_get(serf_fcgi_protocol
for (stream = fcgi->first; stream; stream = stream->next)
{
if (stream->streamid == streamid)
- {
- if (move_first && stream != fcgi->first)
- move_to_head(stream);
-
return stream;
- }
}
if (create)
@@ -476,6 +464,24 @@ serf_fcgi__stream_get(serf_fcgi_protocol
return NULL;
}
+void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi,
+ serf_fcgi_stream_t *stream)
+{
+ if (!stream->prev)
+ fcgi->first = stream->next;
+ else
+ stream->prev->next = stream;
+
+ if (stream->next)
+ stream->next->prev = stream->prev;
+ else
+ fcgi->last = stream->prev;
+
+ fcgi->first = fcgi->last = NULL;
+
+ serf_fcgi__stream_destroy(stream);
+}
+
apr_status_t serf_fcgi__setup_incoming_request(
serf_incoming_request_t **in_request,
serf_incoming_request_setup_t *req_setup,
Modified: serf/trunk/protocols/fcgi_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Wed Nov 18 10:42:45 2015
@@ -185,8 +185,7 @@ typedef apr_status_t(*serf_fcgi_processo
/* From fcgi_protocol.c */
serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
apr_uint16_t streamid,
- bool create,
- bool move_first);
+ bool create);
apr_status_t serf_fcgi__setup_incoming_request(
@@ -199,6 +198,9 @@ apr_status_t serf_fcgi__enqueue_frame(se
serf_bucket_t *frame,
bool pump);
+void serf_fcgi__close_stream(serf_fcgi_protocol_t *fcgi,
+ serf_fcgi_stream_t *stream);
+
/* From fcgi_stream.c */
serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
@@ -219,6 +221,7 @@ serf_bucket_t * serf_fcgi__stream_handle
serf_config_t *config,
serf_bucket_alloc_t *alloc);
+void serf_fcgi__stream_destroy(serf_fcgi_stream_t *stream);
#ifdef __cplusplus
Modified: serf/trunk/protocols/fcgi_stream.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_stream.c (original)
+++ serf/trunk/protocols/fcgi_stream.c Wed Nov 18 10:42:45 2015
@@ -31,8 +31,11 @@
#include "protocols/fcgi_buckets.h"
#include "protocols/fcgi_protocol.h"
+/* Fully opaque variant of serf_fcgi_stream_t */
struct serf_fcgi_stream_data_t
{
+ serf_fcgi_stream_t stream_data;
+
serf_bucket_t *req_agg;
bool headers_eof;
bool stdin_eof;
@@ -45,8 +48,12 @@ serf_fcgi_stream_t * serf_fcgi__stream_c
apr_uint16_t streamid,
serf_bucket_alloc_t *alloc)
{
- serf_fcgi_stream_t *stream = serf_bucket_mem_alloc(alloc,
- sizeof(*stream));
+ serf_fcgi_stream_t *stream;
+ serf_fcgi_stream_data_t *data = serf_bucket_mem_calloc(alloc,
+ sizeof(*data));
+
+ stream = &data->stream_data;
+ stream->data = data;
stream->fcgi = fcgi;
stream->alloc = alloc;
@@ -54,12 +61,19 @@ serf_fcgi_stream_t * serf_fcgi__stream_c
stream->next = stream->prev = NULL;
- /* Delay creating this? */
- stream->data = serf_bucket_mem_calloc(alloc, sizeof(*stream->data));
-
return stream;
}
+void serf_fcgi__stream_destroy(serf_fcgi_stream_t * stream)
+{
+ if (stream->data->in_request)
+ serf__incoming_request_destroy(stream->data->in_request);
+
+
+ /* Destroy stream and stream->data */
+ serf_bucket_mem_free(stream->alloc, stream);
+}
+
/* Aggregate hold open callback for what requests will think is the
actual body */
static apr_status_t stream_agg_eof(void *baton,
@@ -73,6 +87,16 @@ static apr_status_t stream_agg_eof(void
return APR_EOF;
}
+static apr_status_t close_stream(void *baton,
+ apr_uint64_t bytes_read)
+{
+ serf_fcgi_stream_t *stream = baton;
+
+ serf_fcgi__close_stream(stream->fcgi, stream);
+
+ return APR_SUCCESS;
+}
+
static apr_status_t
fcgi_stream_enqueue_response(serf_incoming_request_t *request,
void *enqueue_baton,
@@ -136,12 +160,15 @@ fcgi_stream_enqueue_response(serf_incomi
/* Send end of request: FCGI_REQUEST_COMPLETE, exit code 0 */
tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\0\0\0\0\0\0\0\0", 8, alloc);
- status = serf_fcgi__enqueue_frame(
- stream->fcgi,
- serf__bucket_fcgi_frame_create(tmp, stream->streamid,
- FCGI_FRAMETYPE(FCGI_V1,
FCGI_END_REQUEST),
- false, false,
- alloc), true);
+ tmp = serf__bucket_fcgi_frame_create(tmp, stream->streamid,
+ FCGI_FRAMETYPE(FCGI_V1,
FCGI_END_REQUEST),
+ false, false,
+ alloc);
+
+ tmp = serf__bucket_event_create(tmp, stream, NULL, NULL,
+ close_stream, alloc);
+
+ status = serf_fcgi__enqueue_frame(stream->fcgi, tmp, true);
return status;
}
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714965&r1=1714964&r2=1714965&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 10:42:45 2015
@@ -294,6 +294,10 @@ apr_status_t
serf__config_store_remove_connection(serf__config_store_t config_store,
serf_connection_t *conn);
+apr_status_t
+serf__config_store_remove_client(serf__config_store_t config_store,
+ serf_incoming_t *client);
+
/* Cleans up all host specific configuration values */
apr_status_t
serf__config_store_remove_host(serf__config_store_t config_store,
@@ -407,9 +411,6 @@ struct serf_incoming_t {
serf_bucket_t *ostream_head;
serf_bucket_t *ostream_tail;
- /* Aggregate bucket used to send the CONNECT request. */
- serf_bucket_t *ssltunnel_ostream;
-
serf_config_t *config;
serf_bucket_t *proto_peek_bkt;
@@ -653,6 +654,7 @@ apr_status_t serf__process_listener(serf
apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming);
apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump);
serf_incoming_request_t *serf__incoming_request_create(serf_incoming_t
*client);
+void serf__incoming_request_destroy(serf_incoming_request_t *request);
/* from outgoing.c */
apr_status_t serf__open_connections(serf_context_t *ctx);