Author: rhuijben Date: Tue Nov 24 15:27:11 2015 New Revision: 1716165 URL: http://svn.apache.org/viewvc?rev=1716165&view=rev Log: Extend the protocol hooks with a hook to notify the protocol about cancelled requests and reprioritization of requests.
We need some basic http/2 prioritization support to help Subversion some specific replay requests in a specific order. * outgoing.c (reset_connection, serf_connection_create, serf_connection_set_framing_type): Initialize callbacks to NULL. * outgoing_request.c (serf__destroy_request): Unhook dependencies if set. (serf__cancel_request): Hook cancel callback. (create_request): Initialize new fields. (serf_connection_request_prioritize): New function. * protocols/http2_protocol.c (http2_cancel_request, http2_prioritize_request): New functions. (serf__http2_protocol_init): Hook up functions. (http2_cancel_request, http2_prioritize_request): New function. * protocols/http2_protocol.h (serf_http2__stream_reset): Use bool. (serf_http2__stream_cancel_request, serf_http2__stream_prioritize_request): New functions. * protocols/http2_stream.c (stream_send_headers): Add argument to support including priority state. (serf_http2__stream_setup_next_request): Setup protocol baton. Send priority info if set on request. (serf_http2__stream_reset): Use bool argument. (serf_http2__stream_cancel_request): New function. (serf_http2__stream_prioritize_request): New function. (http2_stream_enqueue_response): Update caller. * serf.h (SERF_REQUEST_PRIORITY_DEFAULT): New define. (serf_connection_request_prioritize): New function. * serf_private.h (serf_request_t): Add some fields. (serf_connection_t): Add callbacks. Modified: serf/trunk/outgoing.c serf/trunk/outgoing_request.c serf/trunk/protocols/http2_protocol.c serf/trunk/protocols/http2_protocol.h serf/trunk/protocols/http2_stream.c serf/trunk/serf.h serf/trunk/serf_private.h Modified: serf/trunk/outgoing.c URL: http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/outgoing.c (original) +++ serf/trunk/outgoing.c Tue Nov 24 15:27:11 2015 @@ -564,6 +564,8 @@ static apr_status_t reset_connection(ser conn->perform_write = write_to_connection; conn->perform_hangup = hangup_connection; conn->perform_teardown = NULL; + conn->perform_cancel_request = NULL; + conn->perform_prioritize_request = NULL; conn->status = APR_SUCCESS; @@ -1245,6 +1247,8 @@ serf_connection_t *serf_connection_creat conn->perform_write = write_to_connection; conn->perform_hangup = hangup_connection; conn->perform_teardown = NULL; + conn->perform_cancel_request = NULL; + conn->perform_prioritize_request = NULL; conn->protocol_baton = NULL; conn->written_reqs = conn->written_reqs_tail = NULL; @@ -1467,6 +1471,8 @@ void serf_connection_set_framing_type( conn->perform_read = read_from_connection; conn->perform_write = write_to_connection; conn->perform_hangup = hangup_connection; + conn->perform_cancel_request = NULL; + conn->perform_prioritize_request = NULL; conn->perform_teardown = NULL; switch (framing_type) { Modified: serf/trunk/outgoing_request.c URL: http://svn.apache.org/viewvc/serf/trunk/outgoing_request.c?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/outgoing_request.c (original) +++ serf/trunk/outgoing_request.c Tue Nov 24 15:27:11 2015 @@ -101,6 +101,69 @@ apr_status_t serf__destroy_request(serf_ { serf_connection_t *conn = request->conn; + if (request->depends_first && request->depends_on) { + apr_uint64_t total = 0; + serf_request_t *r, **pr; + + /* Calculate total priority of descendants */ + for (r = request->depends_first; r; r = r->depends_next) { + total += r->dep_priority; + } + + if (r->priority) + total *= r->priority; + + /* Apply now, as if they depend on the parent */ + for (r = request->depends_first; r; r = r->depends_next) { + if (r->dep_priority) + r->dep_priority = (apr_uint16_t)(total / r->dep_priority); + r->depends_on = request->depends_on; + } + + /* Remove us from parent */ + pr = &request->depends_on->depends_first; + while (*pr) { + if (*pr == request) + *pr = request->depends_next; + + pr = &(*pr)->depends_next; + } + + /* And append all our descendants */ + *pr = request->depends_first; + + request->depends_on = NULL; + request->depends_first = NULL; + request->depends_next = NULL; + } + else if (request->depends_first) { + /* Dependencies will lose their parent */ + serf_request_t *r, *next; + + for (r = request->depends_first; r; r = next) { + next = r->next; + + r->depends_on = NULL; + r->depends_next = NULL; + } + request->depends_first = NULL; + } + else if (request->depends_on) { + serf_request_t **pr; + + /* Remove us from parent */ + pr = &request->depends_on->depends_first; + while (*pr) { + if (*pr == request) { + *pr = request->depends_next; + break; + } + + pr = &(*pr)->depends_next; + } + request->depends_on = NULL; + } + if (request->writing >= SERF_WRITING_STARTED && request->writing < SERF_WRITING_FINISHED) { @@ -136,8 +199,15 @@ apr_status_t serf__cancel_request(serf_r /* We actually don't care what the handler returns. * We have bigger matters at hand. */ - (*request->handler)(request, NULL, request->handler_baton, - request->respool); + (void)request->handler(request, NULL, request->handler_baton, + request->respool); + + request->handler = NULL; + } + + if (request->conn && request->conn->perform_cancel_request) { + request->conn->perform_cancel_request(request, + SERF_ERROR_HTTP2_CANCEL); } if (*list == request) { @@ -352,6 +422,11 @@ create_request(serf_connection_t *conn, request->ssltunnel = ssltunnel; request->next = NULL; request->auth_baton = NULL; + request->protocol_baton = NULL; + request->depends_on = NULL; + request->depends_next = NULL; + request->depends_first = NULL; + request->dep_priority = SERF_REQUEST_PRIORITY_DEFAULT; return request; } @@ -476,6 +551,92 @@ apr_status_t serf_request_cancel(serf_re } +void serf_connection_request_prioritize(serf_request_t *request, + serf_request_t *depends_on, + apr_uint16_t priority, + int exclusive) +{ + if (request->depends_on != depends_on) { + serf_request_t *r; + + if (depends_on->conn != request->conn || depends_on == request) + abort(); + + /* If we are indirectly made dependent on ourself, we first + reprioritize the descendant on our current parent. See + https://tools.ietf.org/html/rfc7540#section-5.3.3 + + If a stream is made dependent on one of its own dependencies, the + formerly dependent stream is first moved to be dependent on the + reprioritized stream's previous parent. The moved dependency + retains its weight. */ + + r = depends_on; + + while (r && r != request && r->depends_on) + r = r->depends_on; + + if (r == request) + { + serf_connection_request_prioritize(depends_on, + request->depends_on, + depends_on->dep_priority, + false /* exclusive */); + } + + if (request->depends_on) { + /* Ok, we can now update our dependency */ + + serf_request_t **pr = &request->depends_on->depends_first; + + while (*pr) { + if (*pr == request) { + *pr = request->depends_next; + break; + } + pr = &(*pr)->depends_next; + } + } + + request->depends_on = depends_on; + + if (depends_on) { + if (exclusive) { + r = depends_on->depends_first; + + while (r) { + r->depends_on = request; + + if (r->depends_next) + r = r->depends_next; + else + break; + } + + if (r) { + r->depends_next = request->depends_first; + r->depends_first = depends_on->depends_first; + } + request->depends_next = NULL; + } + else { + request->depends_next = depends_on->depends_first; + } + depends_on->depends_first = request; + } + else + request->depends_next = NULL; + } + + if (priority) + request->dep_priority = priority; + + /* And now tell the protocol about this */ + if (request->conn->perform_prioritize_request) + request->conn->perform_prioritize_request(request, exclusive != 0); +} + + apr_status_t serf_request_is_written(serf_request_t *request) { if (request->writing >= SERF_WRITING_FINISHED) Modified: serf/trunk/protocols/http2_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.c (original) +++ serf/trunk/protocols/http2_protocol.c Tue Nov 24 15:27:11 2015 @@ -58,6 +58,12 @@ http2_incoming_teardown(serf_incoming_t static apr_status_t http2_process(serf_http2_protocol_t *h2); +static void +http2_cancel_request(serf_request_t *rq, apr_status_t reason); + +static void +http2_prioritize_request(serf_request_t *rq, bool exclusive); + static serf_bucket_t * serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format, ...) @@ -280,6 +286,8 @@ void serf__http2_protocol_init(serf_conn conn->perform_write = http2_outgoing_write; conn->perform_hangup = http2_outgoing_hangup; conn->perform_teardown = http2_outgoing_teardown; + conn->perform_cancel_request = http2_cancel_request; + conn->perform_prioritize_request = http2_prioritize_request; conn->protocol_baton = h2; /* Disable HTTP/1.1 guessing that affects writability */ @@ -1670,6 +1678,28 @@ http2_outgoing_teardown(serf_connection_ conn->protocol_baton = NULL; } +static void http2_cancel_request(serf_request_t *rq, apr_status_t reason) +{ + serf_connection_t *conn = rq->conn; + + if (!conn || !conn->protocol_baton || !rq->protocol_baton) + return; + + serf_http2__stream_cancel_request(rq->protocol_baton, + rq, reason); +} + +static void http2_prioritize_request(serf_request_t *rq, bool exclusive) +{ + serf_connection_t *conn = rq->conn; + + if (!conn || !conn->protocol_baton || !rq->protocol_baton) + return; + + serf_http2__stream_prioritize_request(rq->protocol_baton, + rq, exclusive); +} + static apr_status_t http2_incoming_read(serf_incoming_t *client) { Modified: serf/trunk/protocols/http2_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/protocols/http2_protocol.h (original) +++ serf/trunk/protocols/http2_protocol.h Tue Nov 24 15:27:11 2015 @@ -222,7 +222,14 @@ void serf_http2__ensure_writable(serf_ht apr_status_t serf_http2__stream_reset(serf_http2_stream_t *stream, apr_status_t reason, - int local_reset); + bool local_reset); + +void serf_http2__stream_cancel_request(serf_http2_stream_t *stream, + serf_request_t *rq, + apr_status_t reason); + +void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream, + serf_request_t *rq, bool exclusive); serf_bucket_t * serf_http2__stream_handle_hpack(serf_http2_stream_t *stream, Modified: serf/trunk/protocols/http2_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/protocols/http2_stream.c (original) +++ serf/trunk/protocols/http2_stream.c Tue Nov 24 15:27:11 2015 @@ -97,7 +97,8 @@ serf_http2__stream_cleanup(serf_http2_st static apr_status_t stream_send_headers(serf_http2_stream_t *stream, serf_bucket_t *hpack, apr_size_t max_payload_size, - bool end_stream) + bool end_stream, + bool priority) { apr_status_t status; bool first_frame = true; @@ -138,7 +139,10 @@ static apr_status_t stream_send_headers( : 0) | ((hpack != NULL) ? 0 - : HTTP2_FLAG_END_HEADERS), + : HTTP2_FLAG_END_HEADERS) + | (priority + ? HTTP2_FLAG_PRIORITY + : 0), &stream->streamid, serf_http2__allocate_stream_id, stream, @@ -320,12 +324,14 @@ serf_http2__stream_setup_next_request(se serf_bucket_t *hpack; serf_bucket_t *body; bool end_stream; + bool priority = false; SERF_H2_assert(request != NULL); if (!request) return APR_EGENERAL; stream->data->request = request; + request->protocol_baton = stream; if (!request->req_bkt) { status = serf__setup_request(request); @@ -353,6 +359,35 @@ serf_http2__stream_setup_next_request(se if (status) return status; + if (request->depends_on && request->depends_on->protocol_baton) + { + serf_http2_stream_t *ds = request->depends_on->protocol_baton; + + if (ds->streamid >= 0) { + serf_bucket_t *agg; + unsigned char priority_data[5]; + + agg = serf_bucket_aggregate_create(request->allocator); + + priority_data[0] = (ds->streamid >> 24) & 0x7F; + /* bit 7 of [0] is the exclusive flag */ + priority_data[1] = (ds->streamid >> 16) & 0xFF; + priority_data[2] = (ds->streamid >> 8) & 0xFF; + priority_data[3] = ds->streamid & 0xFF; + priority_data[4] = request->dep_priority >> 8; + + serf_bucket_aggregate_append( + agg, + serf_bucket_simple_copy_create((void *)priority_data, + 5, request->allocator)); + + serf_bucket_aggregate_append(agg, hpack); + hpack = agg; + + priority = true; + } + } + if (!body) { serf_bucket_destroy(request->req_bkt); request->req_bkt = NULL; @@ -362,7 +397,7 @@ serf_http2__stream_setup_next_request(se end_stream = false; status = stream_send_headers(stream, hpack, max_payload_size, - end_stream); + end_stream, priority); if (status) return status; @@ -388,7 +423,7 @@ serf_http2__stream_setup_next_request(se apr_status_t serf_http2__stream_reset(serf_http2_stream_t *stream, apr_status_t reason, - int local_reset) + bool local_reset) { stream->status = H2S_CLOSED; @@ -403,6 +438,47 @@ serf_http2__stream_reset(serf_http2_stre return APR_SUCCESS; } +void serf_http2__stream_cancel_request(serf_http2_stream_t *stream, + serf_request_t *rq, + apr_status_t reason) +{ + if (stream->streamid < 0) + return; /* Never hit the wire */ + else if (stream->status == H2S_CLOSED) + return; /* We are already detached */ + + if (reason < SERF_ERROR_HTTP2_NO_ERROR + || reason > SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED) + { + reason = SERF_ERROR_HTTP2_CANCEL; + } + + /* Let the other party know we don't want anything */ + serf_http2__stream_reset(stream, reason, true); + + if (!stream->data) + return; + + if (stream->data && stream->data->request) + stream->data->request = NULL; + + /* Would be nice if we could response_agg, but that is typically + not safe here, as we might still be reading from it */ +} + +void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream, + serf_request_t *rq, + bool exclusive) +{ + if (stream->streamid < 0) + return; /* Never hit the wire */ + else if (stream->status == H2S_CLOSED) + return; /* We are already detached */ + + /* Ignore for now. We start by handling this at setup */ +} + + static apr_status_t stream_response_eof(void *baton, serf_bucket_t *aggregate_bucket) @@ -476,7 +552,7 @@ http2_stream_enqueue_response(serf_incom status = stream_send_headers(stream, hpack, serf_http2__max_payload_size(stream->h2), - false); + false /* eos */, false /* priority */); if (status) return status; Modified: serf/trunk/serf.h URL: http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/serf.h (original) +++ serf/trunk/serf.h Tue Nov 24 15:27:11 2015 @@ -722,6 +722,39 @@ serf_request_t *serf_connection_priority serf_request_setup_t setup, void *setup_baton); +/** The default request priority */ +#define SERF_REQUEST_PRIORITY_DEFAULT 0x1000 + +/** + * Updates the request's priority information. Some protocol implementations, + * such as HTTP/2 may use this information for response scheduling. The + * actual behavior depends on the server, intermediate proxies and of course + * the protocol implementation. + * + * It is recommended to prioritize a request before sending it to the server, + * as that avoids race conditions and receiving unwanted results. + * + * If @a depends_on is set, then the request is marked as dependent on + * @a depends_on, and the result of @a request will only be received if + * no progress can be made on @a depends_on itself. + * + * @a priority is used to relatively prioritize multiple dependencies on the + * same target. Passing 0 will keep the original priority. In case of HTTP/2 + * this value is mapped to a 8 bit value by ignoring the lowest 8 bits. + * + * By default a request is created at priority SERF_REQUEST_PRIORITY_DEFAULT. + * + * If @a exclusive is set to TRUE, then all existing dependencies on @a + * depends_on will be updated to now depend on @a request, to make @a + * request the only dependency of @a request. When FALSE, request will just + * be added as a dependency. + * + * @since New in 1.4. + */ +void serf_connection_request_prioritize(serf_request_t *request, + serf_request_t *depends_on, + apr_uint16_t priority, + int exclusive); /** Returns detected network latency for the @a conn connection. Negative * value means that latency is unknwon. Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1716165&r1=1716164&r2=1716165&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Tue Nov 24 15:27:11 2015 @@ -235,6 +235,11 @@ struct serf_request_t { /* 1 if this is a request to setup a SSL tunnel, 0 for normal requests. */ int ssltunnel; + serf_request_t *depends_on; /* On what request do we depend */ + serf_request_t *depends_next; /* Next dependency on parent*/ + serf_request_t *depends_first; /* First dependency on us */ + apr_uint16_t dep_priority; + /* This baton is currently only used for digest authentication, which needs access to the uri of the request in the response handler. If serf_request_t is replaced by a serf_http_request_t in the future, @@ -242,6 +247,10 @@ struct serf_request_t { anymore. */ void *auth_baton; + /* This baton is free to set by protocol handlers. They typically use it + for identifying or storing related information */ + void *protocol_baton; + struct serf_request_t *next; }; @@ -581,6 +590,12 @@ struct serf_connection_t { void (*perform_teardown)(serf_connection_t *conn); void *protocol_baton; + /* Request callbacks. NULL unless handled by the protocol implementation */ + void (*perform_cancel_request)(serf_request_t *request, + apr_status_t reason); + void (*perform_prioritize_request)(serf_request_t *request, + bool exclusive); + /* Configuration shared with buckets and authn plugins */ serf_config_t *config; };