Author: rhuijben
Date: Thu Nov 5 14:05:02 2015
New Revision: 1712776
URL: http://svn.apache.org/viewvc?rev=1712776&view=rev
Log:
Extend request write state to an enum that exactly tracks what state
we are in. This allows determining whether it is safe to clear the request
pool and when we certainly shouldn't do this.
This patch fixes the segfaults we are currently seeing on trunk, but
temporarily introduces some other issues. Some connection level errors
are now routed upwards while the tests expects them to trigger a retry.
I'll fix those problems in a separate patch.
* connection_request.c
(clean_resp): Detect a case where we can't destroy yet and ask the
connection to recover.
(serf__destroy_request): When destroying a request that has still
data waiting in the pipeline, just schedule it for destruction.
(create_request): Initialize writing state. Initialize variable that
we didn't setup yet but took time to diagnose why this pointed
to something.
(priority_request_create): Check new state.
(serf_request_is_written): Use new state to give the same info.
* outgoing.c
(writing_queue_empty): New helper function
(request_or_data_pending): Split into two functions...
(data_pending): ... this
(request_pending): ... and this.
(request_or_data_pending): And reimplement this function around these.
(serf__conn_update_pollset): Use a poll to the write collection bucket
to get if we should write, but also note that writes are done.
(destroy_ostream): Rename to...
(serf__connection_pre_cleanup): ... this, and also apply other work
that is necessary ensure the output stream is empty.
And when it is empty, clean out dependencies.
(do_conn_setup): Update caller.
(reset_connection): Clean the output stream before and the input stream
after the requests, to stick to bucket lifetime promises.
(write_to_connection,
read_from_connection): Use new writing state variable.
(serf_connection_create): Explicitly set some variables.
(serf_connection_close): Clean the output stream before and the input stream
after the requests, to stick to bucket lifetime promises.
* serf_private.h
(serf_request_writing_t): New enum.
(serf_request_t): Replace writing_started bool with an enum.
(serf_connection_t): Add a done request list.
Modified:
serf/trunk/connection_request.c
serf/trunk/outgoing.c
serf/trunk/serf_private.h
Modified: serf/trunk/connection_request.c
URL:
http://svn.apache.org/viewvc/serf/trunk/connection_request.c?rev=1712776&r1=1712775&r2=1712776&view=diff
==============================================================================
--- serf/trunk/connection_request.c (original)
+++ serf/trunk/connection_request.c Thu Nov 5 14:05:02 2015
@@ -32,35 +32,45 @@
static apr_status_t clean_resp(void *data)
{
serf_request_t *request = data;
+ apr_pool_t *respool = request->respool;
+
+ request->respool = NULL;
/* The request's RESPOOL is being cleared. */
+ if (respool && request->writing >= SERF_WRITING_STARTED
+ && request->writing < SERF_WRITING_FINISHED) {
+
+ /* HOUSTON, WE HAVE A PROBLEM.
+
+ We created buckets inside the pool that is now cleaned and
+ stored them in an aggregate that still lives on.
+
+ This happens when the application decides that it doesn't
+ need the connection any more and clears the pool of the
+ connection, of which the request pool is a subpool.
+
+ Let's ask the connection to take care of things */
+ serf__connection_pre_cleanup(request->conn);
+ }
/* If the response has allocated some buckets, then destroy them (since
the bucket may hold resources other than memory in RESPOOL). Also
make sure to set their fields to NULL so connection closure does
not attempt to free them again. */
if (request->resp_bkt) {
- if (request->respool)
+ if (respool)
serf_debug__closed_conn(request->resp_bkt->allocator);
serf_bucket_destroy(request->resp_bkt);
request->resp_bkt = NULL;
}
if (request->req_bkt) {
- if (request->respool)
+ if (respool)
serf_debug__closed_conn(request->req_bkt->allocator);
- if (!request->writing_started)
+ if (request->writing == SERF_WRITING_NONE)
serf_bucket_destroy(request->req_bkt);
request->req_bkt = NULL;
}
- /* ### should we worry about debug stuff, like that performed in
- ### destroy_request()? should we worry about calling req->handler
- ### to notify this "cancellation" due to pool clearing? */
-
- /* This pool just got cleared/destroyed. Don't try to destroy the pool
- (again) when the request is canceled. */
- request->respool = NULL;
-
return APR_SUCCESS;
}
@@ -81,16 +91,30 @@ apr_status_t serf__destroy_request(serf_
{
serf_connection_t *conn = request->conn;
- if (request->respool) {
- apr_pool_t *pool = request->respool;
+ if (request->writing >= SERF_WRITING_STARTED
+ && request->writing < SERF_WRITING_FINISHED) {
- apr_pool_cleanup_run(pool, request, clean_resp);
- apr_pool_destroy(pool);
+ /* Schedule for destroy when it is safe again.
- serf_debug__bucket_alloc_check(request->allocator);
+ Destroying now will destroy memory of buckets that we
+ may still need.
+ */
+ serf__link_requests(&conn->done_reqs, &conn->done_reqs_tail,
+ request);
}
+ else {
+
+ if (request->respool) {
+ apr_pool_t *pool = request->respool;
- serf_bucket_mem_free(conn->allocator, request);
+ apr_pool_cleanup_run(pool, request, clean_resp);
+ apr_pool_destroy(pool);
+
+ serf_debug__bucket_alloc_check(request->allocator);
+ }
+
+ serf_bucket_mem_free(conn->allocator, request);
+ }
return APR_SUCCESS;
}
@@ -311,11 +335,12 @@ create_request(serf_connection_t *conn,
request->setup = setup;
request->setup_baton = setup_baton;
request->handler = NULL;
+ request->acceptor = NULL;
request->respool = NULL;
request->req_bkt = NULL;
request->resp_bkt = NULL;
request->priority = priority;
- request->writing_started = 0;
+ request->writing = SERF_WRITING_NONE;
request->ssltunnel = ssltunnel;
request->next = NULL;
request->auth_baton = NULL;
@@ -364,7 +389,8 @@ priority_request_create(serf_connection_
/* TODO: what if a request is partially written? */
/* Find a request that has data which needs to be delivered. */
- while (iter != NULL && iter->req_bkt == NULL && iter->writing_started) {
+ while (iter != NULL && iter->req_bkt == NULL
+ && (iter->writing >= SERF_WRITING_STARTED)) {
prev = iter;
iter = iter->next;
}
@@ -446,7 +472,7 @@ apr_status_t serf_request_cancel(serf_re
apr_status_t serf_request_is_written(serf_request_t *request)
{
- if (request->writing_started && !request->req_bkt)
+ if (request->writing >= SERF_WRITING_FINISHED)
return APR_SUCCESS;
return APR_EBUSY;
Modified: serf/trunk/outgoing.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1712776&r1=1712775&r2=1712776&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Thu Nov 5 14:05:02 2015
@@ -64,17 +64,64 @@ static apr_status_t clean_conn(void *dat
return APR_SUCCESS;
}
-/* Check if there is data waiting to be sent over the socket. This can happen
- in two situations:
- - The connection queue has atleast one request with unwritten data.
- - All requests are written and the ssl layer wrote some data while reading
- the response. This can happen when the server triggers a renegotiation,
- e.g. after the first and only request on that connection was received.
- Returns 1 if data is pending on CONN, NULL if not.
- If NEXT_REQ is not NULL, it will be filled in with the next available
request
- with unwritten data. */
+/* Called in different places when the writing queue is empty. At this
+ point it may be safe to destroy some old request instances */
+void writing_queue_empty(serf_connection_t *conn)
+{
+ serf_request_t *rq;
+
+ /* Tell all written request that they are free to destroy themselves */
+ rq = conn->written_reqs;
+ while (rq != NULL) {
+ if (rq->writing == SERF_WRITING_DONE)
+ rq->writing = SERF_WRITING_FINISHED;
+ rq = rq->next;
+ }
+
+ /* Destroy the requests that were queued up to destroy later */
+ while ((rq = conn->done_reqs)) {
+ conn->done_reqs = rq->next;
+
+ rq->writing = SERF_WRITING_FINISHED;
+ serf__destroy_request(rq);
+ }
+ conn->done_reqs = conn->done_reqs_tail = NULL;
+}
+
+/* Safely check if there is still data pending on the connection, carefull
+ to not accidentally make it invalid. */
static int
-request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn)
+data_pending(serf_connection_t *conn)
+{
+ if (conn->vec_len > 0)
+ return TRUE; /* We can't poll right now! */
+
+ if (conn->ostream_head) {
+ const char *dummy;
+ apr_size_t len;
+ apr_status_t status;
+
+ status = serf_bucket_peek(conn->ostream_head, &dummy,
+ &len);
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ if (len > 0) {
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
+ "Extra data to be written after sending complete "
+ "requests.\n");
+ return TRUE;
+ }
+
+ writing_queue_empty(conn);
+ }
+ else
+ return TRUE; /* Sure, we have data (an error) */
+ }
+
+ return FALSE;
+}
+
+static int
+request_pending(serf_request_t **next_req, serf_connection_t *conn)
{
int reqs_in_progress;
@@ -92,28 +139,31 @@ request_or_data_pending(serf_request_t *
*next_req = request;
if (request != NULL) {
- return 1;
+ return TRUE;
}
}
-
- if (next_req)
+ else if (next_req)
*next_req = NULL;
- if (conn->ostream_head) {
- const char *dummy;
- apr_size_t len;
- apr_status_t status;
+ return FALSE;
+}
- status = serf_bucket_peek(conn->ostream_head, &dummy,
- &len);
- if (!SERF_BUCKET_READ_ERROR(status) && len) {
- serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
- "Extra data to be written after sending complete
requests.\n");
- return 1;
- }
- }
+/* Check if there is data waiting to be sent over the socket. This can happen
+ in two situations:
+ - The connection queue has atleast one request with unwritten data.
+ - All requests are written and the ssl layer wrote some data while reading
+ the response. This can happen when the server triggers a renegotiation,
+ e.g. after the first and only request on that connection was received.
+ Returns 1 if data is pending on CONN, NULL if not.
+ If NEXT_REQ is not NULL, it will be filled in with the next available
request
+ with unwritten data. */
+static int
+request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn)
+{
+ if (request_pending(next_req, conn))
+ return TRUE;
- return 0;
+ return data_pending(conn);
}
/* Update the pollset for this connection. We tweak the pollset based on
@@ -126,6 +176,7 @@ apr_status_t serf__conn_update_pollset(s
serf_context_t *ctx = conn->ctx;
apr_status_t status;
apr_pollfd_t desc = { 0 };
+ int data_waiting;
if (!conn->skt) {
return APR_SUCCESS;
@@ -146,16 +197,63 @@ apr_status_t serf__conn_update_pollset(s
/* If we are not connected yet, we just want to know when we are */
if (conn->wait_for_connect) {
+ data_waiting = TRUE;
+ }
+ else {
+ /* Directly look at the connection data. While this may look
+ more expensive than the cheap checks later this peek is
+ just checking a bit of ram.
+
+ But it also has the nice side effect of removing references
+ from the aggregate to requests that are done.
+ */
+ if (conn->vec_len) {
+ /* We still have vecs in the connection, which lifetime is
+ managed by buckets inside conn->ostream_head.
+
+ Don't touch ostream as that might destroy the
+ vecs */
+
+ data_waiting = (conn->state != SERF_CONN_CLOSING);
+ }
+ else {
+ serf_bucket_t *ostream;
+ data_waiting = FALSE;
+
+ ostream = conn->ostream_head;
+
+ if (!ostream)
+ ostream = conn->ssltunnel_ostream;
+
+ if (ostream) {
+ const char *dummy_data;
+ apr_size_t len;
+
+ status = serf_bucket_peek(ostream, &dummy_data, &len);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ data_waiting = TRUE; /* Error waiting */
+ else if (len > 0)
+ data_waiting = TRUE;
+ }
+
+ if (!data_waiting)
+ writing_queue_empty(conn);
+ }
+ }
+
+ if (data_waiting && ! conn->stop_writing) {
desc.reqevents |= APR_POLLOUT;
}
- else if ((conn->written_reqs || conn->unwritten_reqs) &&
+
+ if ((conn->written_reqs || conn->unwritten_reqs) &&
conn->state != SERF_CONN_INIT) {
/* If there are any outstanding events, then we want to read. */
/* ### not true. we only want to read IF we have sent some data */
desc.reqevents |= APR_POLLIN;
/* Don't write if OpenSSL told us that it needs to read data first. */
- if (! conn->stop_writing) {
+ if (! conn->stop_writing && !data_waiting) {
/* If the connection is not closing down and
* has unwritten data or
@@ -174,7 +272,7 @@ apr_status_t serf__conn_update_pollset(s
conn->max_outstanding_requests)) {
/* we wouldn't try to write any way right now. */
}
- else if (request_or_data_pending(NULL, conn)) {
+ else if (request_pending(NULL, conn)) {
desc.reqevents |= APR_POLLOUT;
}
}
@@ -219,13 +317,22 @@ static void check_buckets_drained(serf_c
#endif
-static void destroy_ostream(serf_connection_t *conn)
+/* Destroys all outstanding write information, to allow cleanup of subpools
+ that may still have data in these buckets to continue */
+void serf__connection_pre_cleanup(serf_connection_t *conn)
{
+ conn->vec_len = 0;
if (conn->ostream_head != NULL) {
serf_bucket_destroy(conn->ostream_head);
conn->ostream_head = NULL;
conn->ostream_tail = NULL;
}
+ if (conn->ssltunnel_ostream != NULL) {
+ serf_bucket_destroy(conn->ssltunnel_ostream);
+ conn->ssltunnel_ostream = NULL;
+ }
+
+ writing_queue_empty(conn);
}
static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
@@ -264,7 +371,7 @@ static apr_status_t do_conn_setup(serf_c
if (status) {
/* extra destroy here since it wasn't added to the head bucket yet. */
serf_bucket_destroy(conn->ostream_tail);
- destroy_ostream(conn);
+ serf__connection_pre_cleanup(conn);
return status;
}
@@ -578,6 +685,8 @@ static apr_status_t reset_connection(ser
conn->unwritten_reqs = NULL;
conn->unwritten_reqs_tail = NULL;
+ serf__connection_pre_cleanup(conn);
+
/* First, cancel all written requests for which we haven't received a
response yet. Inform the application that the request is cancelled,
so it can requeue them if needed. */
@@ -595,7 +704,7 @@ static apr_status_t reset_connection(ser
* Do not copy a CONNECT request to the new connection, the ssl tunnel
* setup code will create a new CONNECT request already.
*/
- if (requeue_requests && !old_reqs->writing_started &&
+ if (requeue_requests && (old_reqs->writing == SERF_WRITING_NONE) &&
!old_reqs->ssltunnel) {
serf_request_t *req = old_reqs;
@@ -626,8 +735,6 @@ static apr_status_t reset_connection(ser
conn->stream = NULL;
}
- destroy_ostream(conn);
-
/* Don't try to resume any writes */
conn->vec_len = 0;
@@ -863,7 +970,7 @@ static apr_status_t write_to_connection(
return status;
}
- if (request && !request->writing_started) {
+ if (request && request->writing == SERF_WRITING_NONE) {
if (request->req_bkt == NULL) {
read_status = serf__setup_request(request);
if (read_status) {
@@ -872,7 +979,7 @@ static apr_status_t write_to_connection(
}
}
- request->writing_started = 1;
+ request->writing = SERF_WRITING_STARTED;
serf_bucket_aggregate_append(ostreamt, request->req_bkt);
}
@@ -893,7 +1000,7 @@ static apr_status_t write_to_connection(
* - we'll see if there are other requests that need to be sent
* ("pipelining").
*/
- request->req_bkt = NULL;
+ request->writing = SERF_WRITING_DONE;
/* Move the request to the written queue */
serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail,
@@ -1026,7 +1133,7 @@ static apr_status_t read_from_connection
* sending the SSL 'close notify' shutdown alert), we'll reset the
* connection and open a new one.
*/
- if (request->req_bkt || !request->writing_started) {
+ if (request->req_bkt || request->writing == SERF_WRITING_NONE) {
const char *data;
apr_size_t len;
@@ -1205,7 +1312,7 @@ static apr_status_t read_from_connection
* update the pollset. We don't want to read from this socket any
* more. We are definitely done with this loop, too.
*/
- if (request == NULL || !request->writing_started) {
+ if (request == NULL || request->writing == SERF_WRITING_NONE) {
conn->dirty_conn = 1;
conn->ctx->dirty_pollset = 1;
status = APR_SUCCESS;
@@ -1365,6 +1472,14 @@ serf_connection_t *serf_connection_creat
conn->perform_teardown = NULL;
conn->protocol_baton = NULL;
+ conn->written_reqs = conn->written_reqs_tail = NULL;
+ conn->nr_of_written_reqs = 0;
+
+ conn->unwritten_reqs = conn->unwritten_reqs_tail = NULL;
+ conn->nr_of_unwritten_reqs;
+
+ conn->done_reqs = conn->done_reqs_tail = 0;
+
/* Create a subpool for our connection. */
apr_pool_create(&conn->skt_pool, conn->pool);
@@ -1459,6 +1574,11 @@ apr_status_t serf_connection_close(
serf_connection_t *conn_seq = GET_CONN(ctx, i);
if (conn_seq == conn) {
+
+ /* Clean up the write bucket first, as this marks all partially
written
+ requests as fully written, allowing more efficient cleanup */
+ serf__connection_pre_cleanup(conn);
+
/* The application asked to close the connection, no need to notify
it for each cancelled request. */
while (conn->written_reqs) {
@@ -1479,8 +1599,6 @@ apr_status_t serf_connection_close(
conn->stream = NULL;
}
- destroy_ostream(conn);
-
if (conn->protocol_baton) {
conn->perform_teardown(conn);
conn->protocol_baton = NULL;
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1712776&r1=1712775&r2=1712776&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Thu Nov 5 14:05:02 2015
@@ -117,6 +117,13 @@ typedef struct serf_io_baton_t {
} u;
} serf_io_baton_t;
+typedef enum serf_request_writing_t {
+ SERF_WRITING_NONE, /* Nothing written */
+ SERF_WRITING_STARTED, /* Data in write bucket(s) */
+ SERF_WRITING_DONE, /* Everything written */
+ SERF_WRITING_FINISHED, /* Safe to destroy */
+} serf_request_writing_t;
+
/* Holds all the information corresponding to a request/response pair. */
struct serf_request_t {
serf_connection_t *conn;
@@ -140,7 +147,7 @@ struct serf_request_t {
serf_bucket_t *resp_bkt;
- int writing_started;
+ serf_request_writing_t writing;
int priority;
/* 1 if this is a request to setup a SSL tunnel, 0 for normal requests. */
int ssltunnel;
@@ -377,6 +384,12 @@ struct serf_connection_t {
serf_request_t *unwritten_reqs_tail;
unsigned int nr_of_unwritten_reqs;
+ /* Requests that are done, but not destroyed yet because they may still
+ have data pending in their pools. Will be destroyed at several
+ safe points. */
+ serf_request_t *done_reqs;
+ serf_request_t *done_reqs_tail;
+
struct iovec vec[IOV_MAX];
int vec_len;
@@ -437,6 +450,10 @@ struct serf_connection_t {
serf_config_t *config;
};
+/* Called by requests that still have outstanding requests to allow cleaning
+ up buckets that may still reference buckets of this request */
+void serf__connection_pre_cleanup(serf_connection_t *);
+
/*** Internal bucket functions ***/
/* Copies all data contained in vecs to *data, optionally telling how much was