Author: rhuijben
Date: Tue Nov 24 15:27:11 2015
New Revision: 1716165

URL: http://svn.apache.org/viewvc?rev=1716165&view=rev
Log:
Extend the protocol hooks with a hook to notify the protocol about cancelled
requests and reprioritization of requests.

We need some basic http/2 prioritization support to help Subversion
some specific replay requests in a specific order.

* outgoing.c
  (reset_connection,
   serf_connection_create,
   serf_connection_set_framing_type): Initialize callbacks to NULL.

* outgoing_request.c
  (serf__destroy_request): Unhook dependencies if set.
  (serf__cancel_request): Hook cancel callback.
  (create_request): Initialize new fields.
  (serf_connection_request_prioritize): New function.

* protocols/http2_protocol.c
  (http2_cancel_request,
   http2_prioritize_request): New functions.
  (serf__http2_protocol_init): Hook up functions.
  (http2_cancel_request,
   http2_prioritize_request): New function.

* protocols/http2_protocol.h
  (serf_http2__stream_reset): Use bool.
  (serf_http2__stream_cancel_request,
   serf_http2__stream_prioritize_request): New functions.

* protocols/http2_stream.c
  (stream_send_headers): Add argument to support including priority state.
  (serf_http2__stream_setup_next_request): Setup protocol baton. Send
    priority info if set on request.
  (serf_http2__stream_reset): Use bool argument.
  (serf_http2__stream_cancel_request): New function.
  (serf_http2__stream_prioritize_request): New function.
  (http2_stream_enqueue_response): Update caller.

* serf.h
  (SERF_REQUEST_PRIORITY_DEFAULT): New define.
  (serf_connection_request_prioritize): New function.

* serf_private.h
  (serf_request_t): Add some fields.
  (serf_connection_t): Add callbacks.

Modified:
    serf/trunk/outgoing.c
    serf/trunk/outgoing_request.c
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h
    serf/trunk/protocols/http2_stream.c
    serf/trunk/serf.h
    serf/trunk/serf_private.h

Modified: serf/trunk/outgoing.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Tue Nov 24 15:27:11 2015
@@ -564,6 +564,8 @@ static apr_status_t reset_connection(ser
     conn->perform_write = write_to_connection;
     conn->perform_hangup = hangup_connection;
     conn->perform_teardown = NULL;
+    conn->perform_cancel_request = NULL;
+    conn->perform_prioritize_request = NULL;
 
     conn->status = APR_SUCCESS;
 
@@ -1245,6 +1247,8 @@ serf_connection_t *serf_connection_creat
     conn->perform_write = write_to_connection;
     conn->perform_hangup = hangup_connection;
     conn->perform_teardown = NULL;
+    conn->perform_cancel_request = NULL;
+    conn->perform_prioritize_request = NULL;
     conn->protocol_baton = NULL;
 
     conn->written_reqs = conn->written_reqs_tail = NULL;
@@ -1467,6 +1471,8 @@ void serf_connection_set_framing_type(
     conn->perform_read = read_from_connection;
     conn->perform_write = write_to_connection;
     conn->perform_hangup = hangup_connection;
+    conn->perform_cancel_request = NULL;
+    conn->perform_prioritize_request = NULL;
     conn->perform_teardown = NULL;
 
     switch (framing_type) {

Modified: serf/trunk/outgoing_request.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing_request.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/outgoing_request.c (original)
+++ serf/trunk/outgoing_request.c Tue Nov 24 15:27:11 2015
@@ -101,6 +101,69 @@ apr_status_t serf__destroy_request(serf_
 {
     serf_connection_t *conn = request->conn;
 
+    if (request->depends_first && request->depends_on) {
+        apr_uint64_t total = 0;
+        serf_request_t *r, **pr;
+
+        /* Calculate total priority of descendants */
+        for (r = request->depends_first; r; r = r->depends_next) {
+            total += r->dep_priority;
+        }
+
+        if (r->priority)
+            total *= r->priority;
+
+        /* Apply now, as if they depend on the parent */
+        for (r = request->depends_first; r; r = r->depends_next) {
+            if (r->dep_priority)
+                r->dep_priority = (apr_uint16_t)(total / r->dep_priority);
+            r->depends_on = request->depends_on;
+        }
+
+        /* Remove us from parent */
+        pr = &request->depends_on->depends_first;
+        while (*pr) {
+            if (*pr == request)
+                *pr = request->depends_next;
+
+            pr = &(*pr)->depends_next;
+        }
+
+        /* And append all our descendants */
+        *pr = request->depends_first;
+
+        request->depends_on = NULL;
+        request->depends_first = NULL;
+        request->depends_next = NULL;
+    }
+    else if (request->depends_first) {
+        /* Dependencies will lose their parent */
+        serf_request_t *r, *next;
+
+        for (r = request->depends_first; r; r = next) {
+            next = r->next;
+
+            r->depends_on = NULL;
+            r->depends_next = NULL;
+        }
+        request->depends_first = NULL;
+    }
+    else if (request->depends_on) {
+        serf_request_t **pr;
+
+        /* Remove us from parent */
+        pr = &request->depends_on->depends_first;
+        while (*pr) {
+            if (*pr == request) {
+                *pr = request->depends_next;
+                break;
+            }
+
+            pr = &(*pr)->depends_next;
+        }
+        request->depends_on = NULL;
+    }
+
     if (request->writing >= SERF_WRITING_STARTED
         && request->writing < SERF_WRITING_FINISHED) {
 
@@ -136,8 +199,15 @@ apr_status_t serf__cancel_request(serf_r
         /* We actually don't care what the handler returns.
          * We have bigger matters at hand.
          */
-        (*request->handler)(request, NULL, request->handler_baton,
-                            request->respool);
+        (void)request->handler(request, NULL, request->handler_baton,
+                               request->respool);
+
+        request->handler = NULL;
+    }
+
+    if (request->conn && request->conn->perform_cancel_request) {
+        request->conn->perform_cancel_request(request,
+                                              SERF_ERROR_HTTP2_CANCEL);
     }
 
     if (*list == request) {
@@ -352,6 +422,11 @@ create_request(serf_connection_t *conn,
     request->ssltunnel = ssltunnel;
     request->next = NULL;
     request->auth_baton = NULL;
+    request->protocol_baton = NULL;
+    request->depends_on = NULL;
+    request->depends_next = NULL;
+    request->depends_first = NULL;
+    request->dep_priority = SERF_REQUEST_PRIORITY_DEFAULT;
 
     return request;
 }
@@ -476,6 +551,92 @@ apr_status_t serf_request_cancel(serf_re
 
 }
 
+void serf_connection_request_prioritize(serf_request_t *request,
+                                        serf_request_t *depends_on,
+                                        apr_uint16_t priority,
+                                        int exclusive)
+{
+    if (request->depends_on != depends_on) {
+        serf_request_t *r;
+
+        if (depends_on->conn != request->conn || depends_on == request)
+            abort();
+
+        /* If we are indirectly made dependent on ourself, we first
+           reprioritize the descendant on our current parent. See
+           https://tools.ietf.org/html/rfc7540#section-5.3.3
+
+           If a stream is made dependent on one of its own dependencies, the
+           formerly dependent stream is first moved to be dependent on the
+           reprioritized stream's previous parent.  The moved dependency
+           retains its weight. */
+
+        r = depends_on;
+
+        while (r && r != request && r->depends_on)
+            r = r->depends_on;
+
+        if (r == request)
+        {
+            serf_connection_request_prioritize(depends_on,
+                                               request->depends_on,
+                                               depends_on->dep_priority,
+                                               false /* exclusive */);
+        }
+
+        if (request->depends_on) {
+        /* Ok, we can now update our dependency */
+
+            serf_request_t **pr = &request->depends_on->depends_first;
+
+            while (*pr) {
+                if (*pr == request) {
+                    *pr = request->depends_next;
+                    break;
+                }
+                pr = &(*pr)->depends_next;
+            }
+        }
+
+        request->depends_on = depends_on;
+
+        if (depends_on) {
+            if (exclusive) {
+                r = depends_on->depends_first;
+
+                while (r) {
+                    r->depends_on = request;
+
+                    if (r->depends_next)
+                        r = r->depends_next;
+                    else
+                        break;
+                }
+
+                if (r) {
+                    r->depends_next = request->depends_first;
+                    r->depends_first = depends_on->depends_first;
+                }
+                request->depends_next = NULL;
+            }
+            else {
+                request->depends_next = depends_on->depends_first;
+            }
+            depends_on->depends_first = request;
+        }
+        else
+            request->depends_next = NULL;
+    }
+
+    if (priority)
+        request->dep_priority = priority;
+
+    /* And now tell the protocol about this */
+    if (request->conn->perform_prioritize_request)
+        request->conn->perform_prioritize_request(request, exclusive != 0);
+}
+
+
 apr_status_t serf_request_is_written(serf_request_t *request)
 {
     if (request->writing >= SERF_WRITING_FINISHED)

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Tue Nov 24 15:27:11 2015
@@ -58,6 +58,12 @@ http2_incoming_teardown(serf_incoming_t
 static apr_status_t
 http2_process(serf_http2_protocol_t *h2);
 
+static void
+http2_cancel_request(serf_request_t *rq, apr_status_t reason);
+
+static void
+http2_prioritize_request(serf_request_t *rq, bool exclusive);
+
 static serf_bucket_t *
 serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format,
                            ...)
@@ -280,6 +286,8 @@ void serf__http2_protocol_init(serf_conn
     conn->perform_write = http2_outgoing_write;
     conn->perform_hangup = http2_outgoing_hangup;
     conn->perform_teardown = http2_outgoing_teardown;
+    conn->perform_cancel_request = http2_cancel_request;
+    conn->perform_prioritize_request = http2_prioritize_request;
     conn->protocol_baton = h2;
 
     /* Disable HTTP/1.1 guessing that affects writability */
@@ -1670,6 +1678,28 @@ http2_outgoing_teardown(serf_connection_
     conn->protocol_baton = NULL;
 }
 
+static void http2_cancel_request(serf_request_t *rq, apr_status_t reason)
+{
+    serf_connection_t *conn = rq->conn;
+
+    if (!conn || !conn->protocol_baton || !rq->protocol_baton)
+        return;
+
+    serf_http2__stream_cancel_request(rq->protocol_baton,
+                                      rq, reason);
+}
+
+static void http2_prioritize_request(serf_request_t *rq, bool exclusive)
+{
+    serf_connection_t *conn = rq->conn;
+
+    if (!conn || !conn->protocol_baton || !rq->protocol_baton)
+        return;
+
+    serf_http2__stream_prioritize_request(rq->protocol_baton,
+                                          rq, exclusive);
+}
+
 static apr_status_t
 http2_incoming_read(serf_incoming_t *client)
 {

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Tue Nov 24 15:27:11 2015
@@ -222,7 +222,14 @@ void serf_http2__ensure_writable(serf_ht
 apr_status_t
 serf_http2__stream_reset(serf_http2_stream_t *stream,
                          apr_status_t reason,
-                         int local_reset);
+                         bool local_reset);
+
+void serf_http2__stream_cancel_request(serf_http2_stream_t *stream,
+                                       serf_request_t *rq,
+                                       apr_status_t reason);
+
+void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream,
+                                           serf_request_t *rq, bool exclusive);
 
 serf_bucket_t *
 serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,

Modified: serf/trunk/protocols/http2_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Tue Nov 24 15:27:11 2015
@@ -97,7 +97,8 @@ serf_http2__stream_cleanup(serf_http2_st
 static apr_status_t stream_send_headers(serf_http2_stream_t *stream,
                                         serf_bucket_t *hpack,
                                         apr_size_t max_payload_size,
-                                        bool end_stream)
+                                        bool end_stream,
+                                        bool priority)
 {
     apr_status_t status;
     bool first_frame = true;
@@ -138,7 +139,10 @@ static apr_status_t stream_send_headers(
                                                 : 0)
                                                | ((hpack != NULL)
                                                   ? 0
-                                                  : HTTP2_FLAG_END_HEADERS),
+                                                  : HTTP2_FLAG_END_HEADERS)
+                                               | (priority
+                                                  ? HTTP2_FLAG_PRIORITY
+                                                  : 0),
                                                &stream->streamid,
                                                serf_http2__allocate_stream_id,
                                                stream,
@@ -320,12 +324,14 @@ serf_http2__stream_setup_next_request(se
     serf_bucket_t *hpack;
     serf_bucket_t *body;
     bool end_stream;
+    bool priority = false;
 
     SERF_H2_assert(request != NULL);
     if (!request)
         return APR_EGENERAL;
 
     stream->data->request = request;
+    request->protocol_baton = stream;
 
     if (!request->req_bkt) {
         status = serf__setup_request(request);
@@ -353,6 +359,35 @@ serf_http2__stream_setup_next_request(se
     if (status)
         return status;
 
+    if (request->depends_on && request->depends_on->protocol_baton)
+    {
+        serf_http2_stream_t *ds = request->depends_on->protocol_baton;
+
+        if (ds->streamid >= 0) {
+            serf_bucket_t *agg;
+            unsigned char priority_data[5];
+
+            agg = serf_bucket_aggregate_create(request->allocator);
+
+            priority_data[0] = (ds->streamid >> 24) & 0x7F;
+            /* bit 7 of [0] is the exclusive flag */
+            priority_data[1] = (ds->streamid >> 16) & 0xFF;
+            priority_data[2] = (ds->streamid >> 8) & 0xFF;
+            priority_data[3] = ds->streamid & 0xFF;
+            priority_data[4] = request->dep_priority >> 8;
+
+            serf_bucket_aggregate_append(
+                agg,
+                serf_bucket_simple_copy_create((void *)priority_data,
+                                               5, request->allocator));
+
+            serf_bucket_aggregate_append(agg, hpack);
+            hpack = agg;
+
+            priority = true;
+        }
+    }
+
     if (!body) {
         serf_bucket_destroy(request->req_bkt);
         request->req_bkt = NULL;
@@ -362,7 +397,7 @@ serf_http2__stream_setup_next_request(se
         end_stream = false;
 
     status = stream_send_headers(stream, hpack, max_payload_size,
-                                 end_stream);
+                                 end_stream, priority);
     if (status)
         return status;
 
@@ -388,7 +423,7 @@ serf_http2__stream_setup_next_request(se
 apr_status_t
 serf_http2__stream_reset(serf_http2_stream_t *stream,
                          apr_status_t reason,
-                         int local_reset)
+                         bool local_reset)
 {
     stream->status = H2S_CLOSED;
 
@@ -403,6 +438,47 @@ serf_http2__stream_reset(serf_http2_stre
     return APR_SUCCESS;
 }
 
+void serf_http2__stream_cancel_request(serf_http2_stream_t *stream,
+                                               serf_request_t *rq,
+                                               apr_status_t reason)
+{
+    if (stream->streamid < 0)
+        return; /* Never hit the wire */
+    else if (stream->status == H2S_CLOSED)
+        return; /* We are already detached */
+
+    if (reason < SERF_ERROR_HTTP2_NO_ERROR
+        || reason > SERF_ERROR_HTTP2_HTTP_1_1_REQUIRED)
+    {
+        reason = SERF_ERROR_HTTP2_CANCEL;
+    }
+
+    /* Let the other party know we don't want anything */
+    serf_http2__stream_reset(stream, reason, true);
+
+    if (!stream->data)
+        return;
+
+    if (stream->data && stream->data->request)
+        stream->data->request = NULL;
+
+    /* Would be nice if we could response_agg, but that is typically
+       not safe here, as we might still be reading from it */
+}
+
+void serf_http2__stream_prioritize_request(serf_http2_stream_t *stream,
+                                           serf_request_t *rq,
+                                           bool exclusive)
+{
+    if (stream->streamid < 0)
+        return; /* Never hit the wire */
+    else if (stream->status == H2S_CLOSED)
+        return; /* We are already detached */
+
+    /* Ignore for now. We start by handling this at setup */
+}
+
+
 static apr_status_t
 stream_response_eof(void *baton,
                     serf_bucket_t *aggregate_bucket)
@@ -476,7 +552,7 @@ http2_stream_enqueue_response(serf_incom
 
     status = stream_send_headers(stream, hpack,
                                  serf_http2__max_payload_size(stream->h2),
-                                 false);
+                                 false /* eos */, false /* priority */);
 
     if (status)
         return status;

Modified: serf/trunk/serf.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/serf.h (original)
+++ serf/trunk/serf.h Tue Nov 24 15:27:11 2015
@@ -722,6 +722,39 @@ serf_request_t *serf_connection_priority
     serf_request_setup_t setup,
     void *setup_baton);
 
+/** The default request priority */
+#define SERF_REQUEST_PRIORITY_DEFAULT 0x1000
+
+/**
+ * Updates the request's priority information. Some protocol implementations,
+ * such as HTTP/2 may use this information for response scheduling. The
+ * actual behavior depends on the server, intermediate proxies and of course
+ * the protocol implementation.
+ *
+ * It is recommended to prioritize a request before sending it to the server,
+ * as that avoids race conditions and receiving unwanted results.
+ *
+ * If @a depends_on is set, then the request is marked as dependent on
+ * @a depends_on, and the result of @a request will only be received if
+ * no progress can be made on @a depends_on itself.
+ *
+ * @a priority is used to relatively prioritize multiple dependencies on the
+ * same target. Passing 0 will keep the original priority. In case of HTTP/2
+ * this value is mapped to a 8 bit value by ignoring the lowest 8 bits.
+ *
+ * By default a request is created at priority SERF_REQUEST_PRIORITY_DEFAULT.
+ *
+ * If @a exclusive is set to TRUE, then all existing dependencies on @a
+ * depends_on will be updated to now depend on @a request, to make @a
+ * request the only dependency of @a request. When FALSE, request will just
+ * be added as a dependency.
+ *
+ * @since New in 1.4.
+ */
+void serf_connection_request_prioritize(serf_request_t *request,
+                                        serf_request_t *depends_on,
+                                        apr_uint16_t priority,
+                                        int exclusive);
 
 /** Returns detected network latency for the @a conn connection. Negative
  *  value means that latency is unknwon.

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1716165&r1=1716164&r2=1716165&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Tue Nov 24 15:27:11 2015
@@ -235,6 +235,11 @@ struct serf_request_t {
     /* 1 if this is a request to setup a SSL tunnel, 0 for normal requests. */
     int ssltunnel;
 
+    serf_request_t *depends_on;      /* On what request do we depend */
+    serf_request_t *depends_next;    /* Next dependency on parent*/
+    serf_request_t *depends_first;   /* First dependency on us */
+    apr_uint16_t dep_priority;
+
     /* This baton is currently only used for digest authentication, which
        needs access to the uri of the request in the response handler.
        If serf_request_t is replaced by a serf_http_request_t in the future,
@@ -242,6 +247,10 @@ struct serf_request_t {
        anymore. */
     void *auth_baton;
 
+    /* This baton is free to set by protocol handlers. They typically use it
+       for identifying or storing related information */
+    void *protocol_baton;
+
     struct serf_request_t *next;
 };
 
@@ -581,6 +590,12 @@ struct serf_connection_t {
     void (*perform_teardown)(serf_connection_t *conn);
     void *protocol_baton;
 
+    /* Request callbacks. NULL unless handled by the protocol implementation */
+    void (*perform_cancel_request)(serf_request_t *request,
+                                   apr_status_t reason);
+    void (*perform_prioritize_request)(serf_request_t *request,
+                                       bool exclusive);
+
     /* Configuration shared with buckets and authn plugins */
     serf_config_t *config;
 };


Reply via email to