Author: rhuijben
Date: Tue Nov 24 15:27:11 2015
New Revision: 1716165
URL: http://svn.apache.org/viewvc?rev=1716165&view=rev
Log:
Extend the protocol hooks with a hook to notify the protocol about cancelled
requests and reprioritization of requests.
We need some basic http/2 prioritization support to help Subversion
some specific replay requests in a specific order.
* outgoing.c
(reset_connection,
serf_connection_create,
serf_connection_set_framing_type): Initialize callbacks to NULL.
* outgoing_request.c
(serf__destroy_request): Unhook dependencies if set.
(serf__cancel_request): Hook cancel callback.
(create_request): Initialize new fields.
(serf_connection_request_prioritize): New function.
* protocols/http2_protocol.c
(http2_cancel_request,
http2_prioritize_request): New functions.
(serf__http2_protocol_init): Hook up functions.
(http2_cancel_request,
http2_prioritize_request): New function.
* protocols/http2_protocol.h
(serf_http2__stream_reset): Use bool.
(serf_http2__stream_cancel_request,
serf_http2__stream_prioritize_request): New functions.
* protocols/http2_stream.c
(stream_send_headers): Add argument to support including priority state.
(serf_http2__stream_setup_next_request): Setup protocol baton. Send
priority info if set on request.
(serf_http2__stream_reset): Use bool argument.
(serf_http2__stream_cancel_request): New function.
(serf_http2__stream_prioritize_request): New function.
(http2_stream_enqueue_response): Update caller.
* serf.h
(SERF_REQUEST_PRIORITY_DEFAULT): New define.
(serf_connection_request_prioritize): New function.
* serf_private.h
(serf_request_t): Add some fields.
(serf_connection_t): Add callbacks.
Modified:
serf/trunk/outgoing.c
serf/trunk/outgoing_request.c
serf/trunk/protocols/http2_protocol.c
serf/trunk/protocols/http2_protocol.h
serf/trunk/protocols/http2_stream.c
serf/trunk/serf.h
serf/trunk/serf_private.h
Modified: serf/trunk/outgoing.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Tue Nov 24 15:27:11 2015
@@ -564,6 +564,8 @@ static apr_status_t reset_connection(ser
conn->perform_write = write_to_connection;
conn->perform_hangup = hangup_connection;
conn->perform_teardown = NULL;
+ conn->perform_cancel_request = NULL;
+ conn->perform_prioritize_request = NULL;
conn->status = APR_SUCCESS;
@@ -1245,6 +1247,8 @@ serf_connection_t *serf_connection_creat
conn->perform_write = write_to_connection;
conn->perform_hangup = hangup_connection;
conn->perform_teardown = NULL;
+ conn->perform_cancel_request = NULL;
+ conn->perform_prioritize_request = NULL;
conn->protocol_baton = NULL;
conn->written_reqs = conn->written_reqs_tail = NULL;
@@ -1467,6 +1471,8 @@ void serf_connection_set_framing_type(
conn->perform_read = read_from_connection;
conn->perform_write = write_to_connection;
conn->perform_hangup = hangup_connection;
+ conn->perform_cancel_request = NULL;
+ conn->perform_prioritize_request = NULL;
conn->perform_teardown = NULL;
switch (framing_type) {
Modified: serf/trunk/outgoing_request.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing_request.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/outgoing_request.c (original)
+++ serf/trunk/outgoing_request.c Tue Nov 24 15:27:11 2015
@@ -101,6 +101,69 @@ apr_status_t serf__destroy_request(serf_
{
serf_connection_t *conn = request->conn;
+ if (request->depends_first && request->depends_on) {
+ apr_uint64_t total = 0;
+ serf_request_t *r, **pr;
+
+ /* Calculate total priority of descendants */
+ for (r = request->depends_first; r; r = r->depends_next) {
+ total += r->dep_priority;
+ }
+
+ if (r->priority)
+ total *= r->priority;
+
+ /* Apply now, as if they depend on the parent */
+ for (r = request->depends_first; r; r = r->depends_next) {
+ if (r->dep_priority)
+ r->dep_priority = (apr_uint16_t)(total / r->dep_priority);
+ r->depends_on = request->depends_on;
+ }
+
+ /* Remove us from parent */
+ pr = &request->depends_on->depends_first;
+ while (*pr) {
+ if (*pr == request)
+ *pr = request->depends_next;
+
+ pr = &(*pr)->depends_next;
+ }
+
+ /* And append all our descendants */
+ *pr = request->depends_first;
+
+ request->depends_on = NULL;
+ request->depends_first = NULL;
+ request->depends_next = NULL;
+ }
+ else if (request->depends_first) {
+ /* Dependencies will lose their parent */
+ serf_request_t *r, *next;
+
+ for (r = request->depends_first; r; r = next) {
+ next = r->next;
+
+ r->depends_on = NULL;
+ r->depends_next = NULL;
+ }
+ request->depends_first = NULL;
+ }
+ else if (request->depends_on) {
+ serf_request_t **pr;
+
+ /* Remove us from parent */
+ pr = &request->depends_on->depends_first;
+ while (*pr) {
+ if (*pr == request) {
+ *pr = request->depends_next;
+ break;
+ }
+
+ pr = &(*pr)->depends_next;
+ }
+ request->depends_on = NULL;
+ }
+
if (request->writing >= SERF_WRITING_STARTED
&& request->writing < SERF_WRITING_FINISHED) {
@@ -136,8 +199,15 @@ apr_status_t serf__cancel_request(serf_r
/* We actually don't care what the handler returns.
* We have bigger matters at hand.
*/
- (*request->handler)(request, NULL, request->handler_baton,
- request->respool);
+ (void)request->handler(request, NULL, request->handler_baton,
+ request->respool);
+
+ request->handler = NULL;
+ }
+
+ if (request->conn && request->conn->perform_cancel_request) {
+ request->conn->perform_cancel_request(request,
+ SERF_ERROR_HTTP2_CANCEL);
}
if (*list == request) {
@@ -352,6 +422,11 @@ create_request(serf_connection_t *conn,
request->ssltunnel = ssltunnel;
request->next = NULL;
request->auth_baton = NULL;
+ request->protocol_baton = NULL;
+ request->depends_on = NULL;
+ request->depends_next = NULL;
+ request->depends_first = NULL;
+ request->dep_priority = SERF_REQUEST_PRIORITY_DEFAULT;
return request;
}
@@ -476,6 +551,92 @@ apr_status_t serf_request_cancel(serf_re
}
+void serf_connection_request_prioritize(serf_request_t *request,
+ serf_request_t *depends_on,
+ apr_uint16_t priority,
+ int exclusive)
+{
+ if (request->depends_on != depends_on) {
+ serf_request_t *r;
+
+ if (depends_on->conn != request->conn || depends_on == request)
+ abort();
+
+ /* If we are indirectly made dependent on ourself, we first
+ reprioritize the descendant on our current parent. See
+ https://tools.ietf.org/html/rfc7540#section-5.3.3
+
+ If a stream is made dependent on one of its own dependencies, the
+ formerly dependent stream is first moved to be dependent on the
+ reprioritized stream's previous parent. The moved dependency
+ retains its weight. */
+
+ r = depends_on;
+
+ while (r && r != request && r->depends_on)
+ r = r->depends_on;
+
+ if (r == request)
+ {
+ serf_connection_request_prioritize(depends_on,
+ request->depends_on,
+ depends_on->dep_priority,
+ false /* exclusive */);
+ }
+
+ if (request->depends_on) {
+ /* Ok, we can now update our dependency */
+
+ serf_request_t **pr = &request->depends_on->depends_first;
+
+ while (*pr) {
+ if (*pr == request) {
+ *pr = request->depends_next;
+ break;
+ }
+ pr = &(*pr)->depends_next;
+ }
+ }
+
+ request->depends_on = depends_on;
+
+ if (depends_on) {
+ if (exclusive) {
+ r = depends_on->depends_first;
+
+ while (r) {
+ r->depends_on = request;
+
+ if (r->depends_next)
+ r = r->depends_next;
+ else
+ break;
+ }
+
+ if (r) {
+ r->depends_next = request->depends_first;
+ r->depends_first = depends_on->depends_first;
+ }
+ request->depends_next = NULL;
+ }
+ else {
+ request->depends_next = depends_on->depends_first;
+ }
+ depends_on->depends_first = request;
+ }
+ else
+ request->depends_next = NULL;
+ }
+
+ if (priority)
+ request->dep_priority = priority;
+
+ /* And now tell the protocol about this */
+ if (request->conn->perform_prioritize_request)
+ request->conn->perform_prioritize_request(request, exclusive != 0);
+}
+
+
apr_status_t serf_request_is_written(serf_request_t *request)
{
if (request->writing >= SERF_WRITING_FINISHED)
Modified: serf/trunk/protocols/http2_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Tue Nov 24 15:27:11 2015
@@ -58,6 +58,12 @@ http2_incoming_teardown(serf_incoming_t
static apr_status_t
http2_process(serf_http2_protocol_t *h2);
+static void
+http2_cancel_request(serf_request_t *rq, apr_status_t reason);
+
+static void
+http2_prioritize_request(serf_request_t *rq, bool exclusive);
+
static serf_bucket_t *
serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format,
...)
@@ -280,6 +286,8 @@ void serf__http2_protocol_init(serf_conn
conn->perform_write = http2_outgoing_write;
conn->perform_hangup = http2_outgoing_hangup;
conn->perform_teardown = http2_outgoing_teardown;
+ conn->perform_cancel_request = http2_cancel_request;
+ conn->perform_prioritize_request = http2_prioritize_request;
conn->protocol_baton = h2;
/* Disable HTTP/1.1 guessing that affects writability */
@@ -1670,6 +1678,28 @@ http2_outgoing_teardown(serf_connection_
conn->protocol_baton = NULL;
}
+static void http2_cancel_request(serf_request_t *rq, apr_status_t reason)
+{
+ serf_connection_t *conn = rq->conn;
+
+ if (!conn || !conn->protocol_baton || !rq->protocol_baton)
+ return;
+
+ serf_http2__stream_cancel_request(rq->protocol_baton,
+ rq, reason);
+}
+
+static void http2_prioritize_request(serf_request_t *rq, bool exclusive)
+{
+ serf_connection_t *conn = rq->conn;
+
+ if (!conn || !conn->protocol_baton || !rq->protocol_baton)
+ return;
+
+ serf_http2__stream_prioritize_request(rq->protocol_baton,
+ rq, exclusive);
+}
+
static apr_status_t
http2_incoming_read(serf_incoming_t *client)
{
Modified: serf/trunk/protocols/http2_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Tue Nov 24 15:27:11 2015
@@ -222,7 +222,14 @@ void serf_http2__ensure_writable(serf_ht
apr_status_t
serf_http2__stream_reset(serf_http2_stream_t *stream,
apr_status_t reason,
- int local_reset);
+ bool local_reset);
+
+void serf_http2__stream_cancel_request(serf_http2_stream_t *stream,
+ serf_request_t *rq,
+ apr_status_t reason);
+
+void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream,
+ serf_request_t *rq, bool exclusive);
serf_bucket_t *
serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,
Modified: serf/trunk/protocols/http2_stream.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Tue Nov 24 15:27:11 2015
@@ -97,7 +97,8 @@ serf_http2__stream_cleanup(serf_http2_st
static apr_status_t stream_send_headers(serf_http2_stream_t *stream,
serf_bucket_t *hpack,
apr_size_t max_payload_size,
- bool end_stream)
+ bool end_stream,
+ bool priority)
{
apr_status_t status;
bool first_frame = true;
@@ -138,7 +139,10 @@ static apr_status_t stream_send_headers(
: 0)
| ((hpack != NULL)
? 0
- : HTTP2_FLAG_END_HEADERS),
+ : HTTP2_FLAG_END_HEADERS)
+ | (priority
+ ? HTTP2_FLAG_PRIORITY
+ : 0),
&stream->streamid,
serf_http2__allocate_stream_id,
stream,
@@ -320,12 +324,14 @@ serf_http2__stream_setup_next_request(se
serf_bucket_t *hpack;
serf_bucket_t *body;
bool end_stream;
+ bool priority = false;
SERF_H2_assert(request != NULL);
if (!request)
return APR_EGENERAL;
stream->data->request = request;
+ request->protocol_baton = stream;
if (!request->req_bkt) {
status = serf__setup_request(request);
@@ -353,6 +359,35 @@ serf_http2__stream_setup_next_request(se
if (status)
return status;
+ if (request->depends_on && request->depends_on->protocol_baton)
+ {
+ serf_http2_stream_t *ds = request->depends_on->protocol_baton;
+
+ if (ds->streamid >= 0) {
+ serf_bucket_t *agg;
+ unsigned char priority_data[5];
+
+ agg = serf_bucket_aggregate_create(request->allocator);
+
+ priority_data[0] = (ds->streamid >> 24) & 0x7F;
+ /* bit 7 of [0] is the exclusive flag */
+ priority_data[1] = (ds->streamid >> 16) & 0xFF;
+ priority_data[2] = (ds->streamid >> 8) & 0xFF;
+ priority_data[3] = ds->streamid & 0xFF;
+ priority_data[4] = request->dep_priority >> 8;
+
+ serf_bucket_aggregate_append(
+ agg,
+ serf_bucket_simple_copy_create((void *)priority_data,
+ 5, request->allocator));
+
+ serf_bucket_aggregate_append(agg, hpack);
+ hpack = agg;
+
+ priority = true;
+ }
+ }
+
if (!body) {
serf_bucket_destroy(request->req_bkt);
request->req_bkt = NULL;
@@ -362,7 +397,7 @@ serf_http2__stream_setup_next_request(se
end_stream = false;
status = stream_send_headers(stream, hpack, max_payload_size,
- end_stream);
+ end_stream, priority);
if (status)
return status;
@@ -388,7 +423,7 @@ serf_http2__stream_setup_next_request(se
apr_status_t
serf_http2__stream_reset(serf_http2_stream_t *stream,
apr_status_t reason,
- int local_reset)
+ bool local_reset)
{
stream->status = H2S_CLOSED;
@@ -403,6 +438,47 @@ serf_http2__stream_reset(serf_http2_stre
return APR_SUCCESS;
}
+void serf_http2__stream_cancel_request(serf_http2_stream_t *stream,
+ serf_request_t *rq,
+ apr_status_t reason)
+{
+ if (stream->streamid < 0)
+ return; /* Never hit the wire */
+ else if (stream->status == H2S_CLOSED)
+ return; /* We are already detached */
+
+ if (reason < SERF_ERROR_HTTP2_NO_ERROR
+ || reason > SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED)
+ {
+ reason = SERF_ERROR_HTTP2_CANCEL;
+ }
+
+ /* Let the other party know we don't want anything */
+ serf_http2__stream_reset(stream, reason, true);
+
+ if (!stream->data)
+ return;
+
+ if (stream->data && stream->data->request)
+ stream->data->request = NULL;
+
+ /* Would be nice if we could response_agg, but that is typically
+ not safe here, as we might still be reading from it */
+}
+
+void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream,
+ serf_request_t *rq,
+ bool exclusive)
+{
+ if (stream->streamid < 0)
+ return; /* Never hit the wire */
+ else if (stream->status == H2S_CLOSED)
+ return; /* We are already detached */
+
+ /* Ignore for now. We start by handling this at setup */
+}
+
+
static apr_status_t
stream_response_eof(void *baton,
serf_bucket_t *aggregate_bucket)
@@ -476,7 +552,7 @@ http2_stream_enqueue_response(serf_incom
status = stream_send_headers(stream, hpack,
serf_http2__max_payload_size(stream->h2),
- false);
+ false /* eos */, false /* priority */);
if (status)
return status;
Modified: serf/trunk/serf.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/serf.h (original)
+++ serf/trunk/serf.h Tue Nov 24 15:27:11 2015
@@ -722,6 +722,39 @@ serf_request_t *serf_connection_priority
serf_request_setup_t setup,
void *setup_baton);
+/** The default request priority */
+#define SERF_REQUEST_PRIORITY_DEFAULT 0x1000
+
+/**
+ * Updates the request's priority information. Some protocol implementations,
+ * such as HTTP/2 may use this information for response scheduling. The
+ * actual behavior depends on the server, intermediate proxies and of course
+ * the protocol implementation.
+ *
+ * It is recommended to prioritize a request before sending it to the server,
+ * as that avoids race conditions and receiving unwanted results.
+ *
+ * If @a depends_on is set, then the request is marked as dependent on
+ * @a depends_on, and the result of @a request will only be received if
+ * no progress can be made on @a depends_on itself.
+ *
+ * @a priority is used to relatively prioritize multiple dependencies on the
+ * same target. Passing 0 will keep the original priority. In case of HTTP/2
+ * this value is mapped to a 8 bit value by ignoring the lowest 8 bits.
+ *
+ * By default a request is created at priority SERF_REQUEST_PRIORITY_DEFAULT.
+ *
+ * If @a exclusive is set to TRUE, then all existing dependencies on @a
+ * depends_on will be updated to now depend on @a request, to make @a
+ * request the only dependency of @a request. When FALSE, request will just
+ * be added as a dependency.
+ *
+ * @since New in 1.4.
+ */
+void serf_connection_request_prioritize(serf_request_t *request,
+ serf_request_t *depends_on,
+ apr_uint16_t priority,
+ int exclusive);
/** Returns detected network latency for the @a conn connection. Negative
* value means that latency is unknwon.
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Tue Nov 24 15:27:11 2015
@@ -235,6 +235,11 @@ struct serf_request_t {
/* 1 if this is a request to setup a SSL tunnel, 0 for normal requests. */
int ssltunnel;
+ serf_request_t *depends_on; /* On what request do we depend */
+ serf_request_t *depends_next; /* Next dependency on parent*/
+ serf_request_t *depends_first; /* First dependency on us */
+ apr_uint16_t dep_priority;
+
/* This baton is currently only used for digest authentication, which
needs access to the uri of the request in the response handler.
If serf_request_t is replaced by a serf_http_request_t in the future,
@@ -242,6 +247,10 @@ struct serf_request_t {
anymore. */
void *auth_baton;
+ /* This baton is free to set by protocol handlers. They typically use it
+ for identifying or storing related information */
+ void *protocol_baton;
+
struct serf_request_t *next;
};
@@ -581,6 +590,12 @@ struct serf_connection_t {
void (*perform_teardown)(serf_connection_t *conn);
void *protocol_baton;
+ /* Request callbacks. NULL unless handled by the protocol implementation */
+ void (*perform_cancel_request)(serf_request_t *request,
+ apr_status_t reason);
+ void (*perform_prioritize_request)(serf_request_t *request,
+ bool exclusive);
+
/* Configuration shared with buckets and authn plugins */
serf_config_t *config;
};