Author: rhuijben
Date: Mon Nov 16 14:39:39 2015
New Revision: 1714598

URL: http://svn.apache.org/viewvc?rev=1714598&view=rev
Log:
Add a callback to the incoming request implementation to allow enqueueing
the response in a protocol dependent way. Extend http/2 incoming request
handling some more. Fix protocol default value that caused the original
protocol error before this patch.

* incoming.c
  (http1_enqueue_reponse): New function.
  (serf_incoming_response_create): Call enqueue function to allow overriding
    behavior in a protocol implementation.
  (serf__incoming_request_create): New function extracted from...
  (read_from_client): ... here.

* protocols/http2_protocol.c
  (serf_http2__setup_incoming_request): New function.

* protocols/http2_protocol.h
  (HTTP2_WINDOW_MAX_ALLOWED): Fix default. Only highest bit of 32 bit int
     is not allowed.
  (serf_http2__setup_incoming_request): New function.

* protocols/http2_stream.c
  (serf_http2_stream_data_t): Also handle streams for incoming requests.
  (http2_stream_enqueue_response): New function.
  (stream_setup_response): Setup incoming requests when in server mode.
  (serf_http2__stream_processor): Process incoming requests when in
    server mode.

* serf_private.h
  (serf_incoming_request_t): Add callback.
  (serf__incoming_request_create): New function.

* test/test_server.c
  (test_listen_http2): Tweak expected result.

Modified:
    serf/trunk/incoming.c
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/protocols/http2_protocol.h
    serf/trunk/protocols/http2_stream.c
    serf/trunk/serf_private.h
    serf/trunk/test/test_server.c

Modified: serf/trunk/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1714598&r1=1714597&r2=1714598&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Mon Nov 16 14:39:39 2015
@@ -126,6 +126,25 @@ static apr_status_t response_finished(vo
     return status;
 }
 
+static apr_status_t http1_enqueue_reponse(serf_incoming_request_t *request,
+                                          void *enqueue_baton,
+                                          serf_bucket_t *bucket)
+{
+    serf_bucket_aggregate_append(request->incoming->ostream_tail,
+                                 serf__bucket_event_create(bucket,
+                                                           request,
+                                                           NULL,
+                                                           NULL,
+                                                           response_finished,
+                                                           bucket->allocator));
+
+    /* Want write event */
+    request->incoming->dirty_conn = true;
+    request->incoming->ctx->dirty_pollset = true;
+
+    return APR_SUCCESS;
+}
+
 apr_status_t serf_incoming_response_create(serf_incoming_request_t *request)
 {
     apr_status_t status;
@@ -146,20 +165,7 @@ apr_status_t serf_incoming_response_crea
 
     request->response_written = true;
 
-    /* ### Needs work for other protocols */
-    serf_bucket_aggregate_append(request->incoming->ostream_tail,
-                                 serf__bucket_event_create(bucket,
-                                                           request,
-                                                           NULL,
-                                                           NULL,
-                                                           response_finished,
-                                                           alloc));
-
-    /* Want write event */
-    request->incoming->dirty_conn = true;
-    request->incoming->ctx->dirty_pollset = true;
-
-    return APR_SUCCESS;
+    return request->enqueue_response(request, request->enqueue_baton, bucket);
 }
 
 apr_status_t perform_peek_protocol(serf_incoming_t *client)
@@ -255,6 +261,22 @@ apr_status_t perform_peek_protocol(serf_
     return status;
 }
 
+serf_incoming_request_t *serf__incoming_request_create(serf_incoming_t *client)
+{
+    serf_incoming_request_t *rq;
+    serf_bucket_t *read_bkt;
+
+    rq = serf_bucket_mem_calloc(client->allocator, sizeof(*rq));
+
+    apr_pool_create(&rq->pool, client->pool);
+    rq->incoming = client;
+
+    rq->enqueue_response = http1_enqueue_reponse;
+    rq->enqueue_baton = rq;
+
+    return rq;
+}
+
 static apr_status_t read_from_client(serf_incoming_t *client)
 {
     apr_status_t status;
@@ -274,12 +296,9 @@ static apr_status_t read_from_client(ser
     do {
         rq = client->current_request;
         if (!rq) {
-
             serf_bucket_t *read_bkt;
-            rq = serf_bucket_mem_calloc(client->allocator, sizeof(*rq));
 
-            apr_pool_create(&rq->pool, client->pool);
-            rq->incoming = client;
+            rq = serf__incoming_request_create(client);
 
             if (client->proto_peek_bkt) {
                 read_bkt = client->proto_peek_bkt;

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1714598&r1=1714597&r2=1714598&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Mon Nov 16 14:39:39 2015
@@ -1869,3 +1869,19 @@ serf_http2__enqueue_stream_reset(serf_ht
                                             h2->allocator),
             TRUE);
 }
+
+apr_status_t
+serf_http2__setup_incoming_request(serf_incoming_request_t **in_request,
+                                   serf_incoming_request_setup_t *req_setup,
+                                   void **req_setup_baton,
+                                   serf_http2_protocol_t *h2)
+{
+    if (!h2->client)
+        return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+
+    *in_request = serf__incoming_request_create(h2->client);
+    *req_setup = h2->client->req_setup;
+    *req_setup_baton = h2->client->req_setup_baton;
+
+    return APR_SUCCESS;
+}

Modified: serf/trunk/protocols/http2_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1714598&r1=1714597&r2=1714598&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Mon Nov 16 14:39:39 2015
@@ -56,7 +56,7 @@ extern "C" {
 
 #define HTTP2_SETTING_SIZE              6
 
-#define HTTP2_WINDOW_MAX_ALLOWED        0x7FFFFFF
+#define HTTP2_WINDOW_MAX_ALLOWED        0x7FFFFFFF
 
 /* Frame type is an 8 bit unsigned integer */
 
@@ -199,6 +199,12 @@ serf_http2__stream_setup_next_request(se
                                       serf_hpack_table_t *hpack_tbl);
 
 apr_status_t
+serf_http2__setup_incoming_request(serf_incoming_request_t **in_request,
+                                   serf_incoming_request_setup_t *req_setup,
+                                   void **req_setup_baton,
+                                   serf_http2_protocol_t *h2);
+
+apr_status_t
 serf_http2__stream_reset(serf_http2_stream_t *stream,
                          apr_status_t reason,
                          int local_reset);

Modified: serf/trunk/protocols/http2_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1714598&r1=1714597&r2=1714598&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Mon Nov 16 14:39:39 2015
@@ -32,6 +32,7 @@
 struct serf_http2_stream_data_t
 {
     serf_request_t *request; /* May be NULL as streams may outlive requests */
+    serf_incoming_request_t *in_request;
     serf_bucket_t *response_agg;
 };
 
@@ -53,6 +54,7 @@ serf_http2__stream_create(serf_http2_pro
     /* Delay creating this? */
     stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data));
     stream->data->request = NULL;
+    stream->data->in_request = NULL;
     stream->data->response_agg = NULL;
 
     stream->lr_window = lr_window;
@@ -233,32 +235,75 @@ stream_response_eof(void *baton,
     }
 }
 
-static void
+static apr_status_t
+http2_stream_enqueue_response(serf_incoming_request_t *request,
+                              void *enqueue_baton,
+                              serf_bucket_t *response_bkt)
+{
+    serf_http2_stream_t *stream = enqueue_baton;
+
+    return APR_ENOTIMPL;
+}
+
+static apr_status_t
 stream_setup_response(serf_http2_stream_t *stream,
                       serf_config_t *config)
 {
-    serf_request_t *request;
     serf_bucket_t *agg;
+    apr_status_t status;
 
     agg = serf_bucket_aggregate_create(stream->alloc);
     serf_bucket_aggregate_hold_open(agg, stream_response_eof, stream);
 
     serf_bucket_set_config(agg, config);
+    stream->data->response_agg = agg;
 
-    request = stream->data->request;
+    if (stream->data->request) {
+        serf_request_t *request = stream->data->request;
 
-    if (!request)
-        return;
+        if (!request->resp_bkt) {
+            apr_pool_t *scratch_pool = request->respool; /* ### Pass scratch 
pool */
 
-    if (!request->resp_bkt) {
-        apr_pool_t *scratch_pool = request->respool; /* ### Pass scratch pool 
*/
+            request->resp_bkt = request->acceptor(request, agg,
+                                                  request->acceptor_baton,
+                                                  scratch_pool);
+        }
+    }
+    else {
+        serf_incoming_request_t *in_request = stream->data->in_request;
+
+        if (!in_request) {
+            serf_incoming_request_setup_t req_setup;
+            void *req_setup_baton;
+
+            status = serf_http2__setup_incoming_request(&in_request, 
&req_setup,
+                                                        &req_setup_baton,
+                                                        stream->h2);
+
+            if (status)
+                return status;
+
+            stream->data->in_request = in_request;
 
-        request->resp_bkt = request->acceptor(request, agg,
-                                              request->acceptor_baton,
-                                              scratch_pool);
+            status = req_setup(&in_request->req_bkt, agg,
+                               in_request, req_setup_baton,
+                               &in_request->handler,
+                               &in_request->handler_baton,
+                               &in_request->response_setup,
+                               &in_request->response_setup_baton,
+                               in_request->pool);
+
+            if (status)
+                return status;
+
+            stream->status = H2S_OPEN;
+
+            in_request->enqueue_response = http2_stream_enqueue_response;
+            in_request->enqueue_baton = stream;
+        }
     }
 
-    stream->data->response_agg = agg;
+    return APR_SUCCESS;
 }
 
 static apr_status_t
@@ -403,17 +448,17 @@ serf_http2__stream_processor(void *baton
 {
     serf_http2_stream_t *stream = baton;
     apr_status_t status = APR_SUCCESS;
-    serf_request_t *request = stream->data->request;
 
     SERF_H2_assert(stream->data->response_agg != NULL);
 
-    if (request) {
+    if (stream->data->request) {
+        serf_request_t *request = stream->data->request;
 
         SERF_H2_assert(request->resp_bkt != NULL);
 
-        status = stream->data->request->handler(request, request->resp_bkt,
-                                                request->handler_baton,
-                                                request->respool);
+        status = request->handler(request, request->resp_bkt,
+                                  request->handler_baton,
+                                  request->respool);
 
         if (!APR_STATUS_IS_EOF(status)
             && !SERF_BUCKET_READ_ERROR(status))
@@ -471,6 +516,37 @@ serf_http2__stream_processor(void *baton
 
         status = APR_SUCCESS;
     }
+    else if (stream->data->in_request) {
+        serf_incoming_request_t *request = stream->data->in_request;
+
+        SERF_H2_assert(request->req_bkt != NULL);
+
+        status = request->handler(request, request->req_bkt,
+                                  request->handler_baton,
+                                  request->pool);
+
+        if (!APR_STATUS_IS_EOF(status)
+            && !SERF_BUCKET_READ_ERROR(status))
+            return status;
+
+        if (APR_STATUS_IS_EOF(status)) {
+            apr_status_t status = serf_incoming_response_create(request);
+
+            if (status)
+                return status;
+        }
+
+        if (SERF_BUCKET_READ_ERROR(status)) {
+
+            if (stream->status != H2S_CLOSED) {
+                /* Tell the other side that we are no longer interested
+                to receive more data */
+                serf_http2__stream_reset(stream, status, TRUE);
+            }
+
+            return status;
+        }
+    }
 
     while (!status)
     {

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714598&r1=1714597&r2=1714598&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Mon Nov 16 14:39:39 2015
@@ -197,6 +197,11 @@ struct serf_incoming_request_t
     serf_incoming_response_setup_t response_setup;
     void *response_setup_baton;
 
+    apr_status_t (*enqueue_response)(serf_incoming_request_t *request,
+                                     void *enqueue_baton,
+                                     serf_bucket_t *response);
+    void *enqueue_baton;
+
     bool request_read;
     bool response_written;
     bool response_finished;
@@ -638,6 +643,7 @@ apr_status_t serf__process_client(serf_i
 apr_status_t serf__process_listener(serf_listener_t *l);
 apr_status_t serf__incoming_update_pollset(serf_incoming_t *incoming);
 apr_status_t serf__incoming_client_flush(serf_incoming_t *client, bool pump);
+serf_incoming_request_t *serf__incoming_request_create(serf_incoming_t 
*client);
 
 /* from outgoing.c */
 apr_status_t serf__open_connections(serf_context_t *ctx);

Modified: serf/trunk/test/test_server.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/test_server.c?rev=1714598&r1=1714597&r2=1714598&view=diff
==============================================================================
--- serf/trunk/test/test_server.c (original)
+++ serf/trunk/test/test_server.c Mon Nov 16 14:39:39 2015
@@ -232,12 +232,13 @@ void test_listen_http2(CuTest *tc)
                                        tb->pool);
     CuAssertIntEquals(tc, APR_SUCCESS, status);
 
-    create_new_request(tb, &handler_ctx[0], "GET", "/", 1);
-    create_new_request(tb, &handler_ctx[1], "GET", "/", 2);
+    /* Our http2 implementation doesn't support request bodies yet */
+    create_new_request(tb, &handler_ctx[0], "GET", "/", -1);
+    create_new_request(tb, &handler_ctx[1], "GET", "/", -1);
 
     status = run_client_server_loop(tb, num_requests,
                                     handler_ctx, tb->pool);
-    CuAssertIntEquals(tc, SERF_ERROR_HTTP2_PROTOCOL_ERROR, status);
+    CuAssertIntEquals(tc, APR_ENOTIMPL, status);
 }
 
 /*****************************************************************************/


Reply via email to