Author: rhuijben
Date: Tue Nov 17 12:49:11 2015
New Revision: 1714774

URL: http://svn.apache.org/viewvc?rev=1714774&view=rev
Log:
Extend fcgi code with parameter decoding. This completes the server side
request parsing. Next step will be to hook up the incoming request routing
to get us a response to send back.

* buckets/fcgi_buckets.c
  (serf_bucket_type__fcgi_params_decode): New bucket type.

* protocols/fcgi_buckets.h
  (serf_bucket_type__fcgi_params_decode): New bucket type.
  (serf__bucket_fcgi_params_decode_create): New function.

* protocols/fcgi_protocol.c
  (SERF_ERROR_FCGI_RECORD_SIZE_ERROR,
   SERF_ERROR_FCGI_PROTOCOL_ERROR): New defines. For now mapped to HTTP2 
versions.
  (fcgi_begin_request): New function.
  (fcgi_process): Handle begin request, params and stdin records.

  (fcgi_write): Return success instead of failure.
  (fcgi_hangup): New function.
  (move_to_head): New function.
  (serf_fcgi__stream_get): New function.
  (fcgi_outgoing_write,
   fcgi_outgoing_hangup,
   fcgi_outgoing_teardown): Forward to the right function.

  (fcgi_server_write,
   fcgi_server_hangup,
   fcgi_server_teardown): Forward to the right function.

* protocols/fcgi_protocol.h
  (*): Copy and paste a few structs from fcgi specs.
  (serf_fcgi_stream_t): Store id and role.
  (serf_fcgi__stream_get): New function.
  (serf_fcgi__stream_create,
   serf_fcgi__stream_processor,
   serf_fcgi__stream_handle_params,
   serf_fcgi__stream_handle_stdin): New function.

* protocols/fcgi_stream.c
  (serf_fcgi_stream_data_t): Fill struct.
  (serf_fcgi__stream_create): Add proper init.
  (stream_agg_eof): New function.
  (serf_fcgi__stream_handle_params,
   serf_fcgi__stream_handle_stdin): New function.
  (serf_fcgi__stream_processor): New function.

Modified:
    serf/trunk/buckets/fcgi_buckets.c
    serf/trunk/protocols/fcgi_buckets.h
    serf/trunk/protocols/fcgi_protocol.c
    serf/trunk/protocols/fcgi_protocol.h
    serf/trunk/protocols/fcgi_stream.c

Modified: serf/trunk/buckets/fcgi_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/fcgi_buckets.c?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/buckets/fcgi_buckets.c (original)
+++ serf/trunk/buckets/fcgi_buckets.c Tue Nov 17 12:49:11 2015
@@ -259,8 +259,346 @@ extern const serf_bucket_type_t serf_buc
     serf_fcgi_unframe_get_remaining,
     serf_fcgi_unframe_set_config
 };
+/* ==================================================================== */
+
+typedef struct fcgi_params_decode_ctx_t
+{
+    serf_bucket_t *stream;
+
+    const char *last_data;
+    apr_size_t last_len;
+
+    apr_size_t key_sz;
+    apr_size_t val_sz;
+
+    enum fcgi_param_decode_status_t
+    {
+        DS_SIZES = 0,
+        DS_KEY,
+        DS_VALUE,
+    } state;
+
+    char size_buffer[8];
+    apr_size_t tmp_size;
+
+    char *key;
+    char *val;
+
+    const char *method;
+    const char *path;
+
+    serf_bucket_t *headers;
+
+} fcgi_params_decode_ctx_t;
+
+serf_bucket_t *
+serf__bucket_fcgi_params_decode_create(serf_bucket_t *stream,
+                                       serf_bucket_alloc_t *alloc)
+{
+    fcgi_params_decode_ctx_t *ctx;
+
+    ctx = serf_bucket_mem_calloc(alloc, sizeof(*ctx));
+    ctx->stream = stream;
+
+    return serf_bucket_create(&serf_bucket_type__fcgi_params_decode, alloc,
+                              ctx);
+}
+
+static apr_size_t size_data_requested(fcgi_params_decode_ctx_t *ctx)
+{
+    apr_size_t requested;
+
+    if (ctx->tmp_size < 1)
+        requested = 2;
+    else if (ctx->size_buffer[0] & 0x80) {
+        requested = 5;
+
+        if (ctx->tmp_size > 4
+            && ctx->size_buffer[4] & 0x80) {
+            requested = 8;
+        }
+    }
+    else if (ctx->tmp_size >= 2
+             && ctx->size_buffer[1] & 0x80) {
+        requested = 5;
+    }
+    else
+        requested = 2;
+
+    return requested;
+}
+
+void fcgi_handle_keypair(serf_bucket_t *bucket)
+{
+    fcgi_params_decode_ctx_t *ctx = bucket->data;
+    char *key = ctx->key;
+    char *val = ctx->val;
+
+    ctx->key = NULL;
+    ctx->val = NULL;
+
+    if (!ctx->headers)
+        ctx->headers = serf_bucket_headers_create(bucket->allocator);
+
+    if (strncasecmp(key, "HTTP_", 5) == 0
+        && strncasecmp(key + 5, "_FCGI_", 6) != 0)
+    {
+        apr_size_t i;
+        memmove(key, key + 5, ctx->key_sz - 5 + 1);
+        for (i = 0; i < ctx->key_sz; i++) {
+            if (key[i] == '_')
+                key[i] = '-';
+        }
+        ctx->key_sz -= 5;
+    }
+    else if (ctx->key_sz == 6 && !strcasecmp(key, "METHOD"))
+    {
+        ctx->method = val;
+        serf_bucket_mem_free(bucket->allocator, key);
+        return;
+    }
+    else if (ctx->key_sz == 11 && !strcasecmp(key, "REQUEST_URI"))
+    {
+        ctx->path = val;
+        serf_bucket_mem_free(bucket->allocator, key);
+        return;
+    }
+    else
+    {
+        memmove(key + 6, key, ctx->key_sz + 1);
+        memcpy(key, "_FCGI_", 6);
+        ctx->key_sz += 6;
+    }
+
+    serf_bucket_headers_setx(ctx->headers,
+                             key, ctx->key_sz, TRUE,
+                             val, ctx->val_sz, TRUE);
+    serf_bucket_mem_free(bucket->allocator, key);
+    serf_bucket_mem_free(bucket->allocator, val);
+}
+
+apr_status_t fcgi_params_decode(serf_bucket_t *bucket)
+{
+    fcgi_params_decode_ctx_t *ctx = bucket->data;
+    apr_status_t status = APR_SUCCESS;
+
+    while (status == APR_SUCCESS) {
+        apr_size_t requested;
+        const char *data;
+        const unsigned char *udata;
+        apr_size_t len;
+
+        switch (ctx->state) {
+            case DS_SIZES:
+                requested = size_data_requested(ctx);
+                status = serf_bucket_read(ctx->stream,
+                                          requested - ctx->tmp_size,
+                                          &data, &len);
+                if (SERF_BUCKET_READ_ERROR(status))
+                    return status;
+
+                if (len < requested) {
+                    memcpy(ctx->size_buffer + ctx->tmp_size, data, len);
+                    ctx->tmp_size += len;
+
+                    len = ctx->tmp_size;
+                    data = ctx->size_buffer;
+                }
+
+                if (size_data_requested(ctx) < len) {
+                    /* Read again. More bytes needed for
+                       determining lengths */
+                    if (data != ctx->size_buffer) {
+                        memcpy(ctx->size_buffer, data, len);
+                        ctx->tmp_size = len;
+                    }
+                    break;
+                }
+
+                udata = (const unsigned char*)data;
+
+                if (udata[0] & 0x80) {
+                    ctx->key_sz = (udata[0] & 0x7F) << 24 | (udata[1] << 16)
+                                        | (udata[2] << 8) | (udata[3]);
+                    udata += 4;
+                }
+                else {
+                    ctx->key_sz = udata[0] & 0x7F;
+                    udata += 1;
+                }
+
+                if (udata[0] & 0x80) {
+                    ctx->val_sz = (udata[0] & 0x7F) << 24 | (udata[1] << 16)
+                                        | (udata[2] << 8) | (udata[3]);
+                    udata += 4;
+                }
+                else {
+                    ctx->val_sz = udata[0] & 0x7F;
+                    udata += 1;
+                }
+
+                ctx->tmp_size = 0;
+                ctx->state++;
+                break;
+            case DS_KEY:
+                status = serf_bucket_read(ctx->stream, ctx->key_sz,
+                                          &data, &len);
+                if (SERF_BUCKET_READ_ERROR(status))
+                    break;
+
+                if (!ctx->key) {
+                    ctx->key = serf_bucket_mem_alloc(bucket->allocator,
+                                                     ctx->key_sz + 1 + 6);
+                    ctx->key[ctx->key_sz] = 0;
+                }
+
+                memcpy(ctx->key + ctx->tmp_size, data, len);
+                ctx->tmp_size += len;
+
+                if (ctx->tmp_size == ctx->key_sz) {
+                    ctx->state++;
+                    ctx->tmp_size = 0;
+                }
+                break;
+            case DS_VALUE:
+                status = serf_bucket_read(ctx->stream, ctx->val_sz,
+                                          &data, &len);
+                if (SERF_BUCKET_READ_ERROR(status))
+                    break;
+                if (!ctx->val) {
+                    ctx->val = serf_bucket_mem_alloc(bucket->allocator,
+                                                     ctx->val_sz + 1);
+                    ctx->val[ctx->val_sz] = 0;
+                }
+
+                if (len == ctx->val_sz)
+                    ctx->state++;
+
+                memcpy(ctx->val + ctx->tmp_size, data, len);
+                ctx->tmp_size += len;
+
+                if (ctx->tmp_size == ctx->val_sz) {
+
+                    fcgi_handle_keypair(bucket);
+                    ctx->state = DS_SIZES;
+                    ctx->tmp_size = 0;
+                }
+                break;
+        }
+    }
+
+    if (APR_STATUS_IS_EOF(status)) {
+        if (ctx->state == DS_SIZES && !ctx->tmp_size
+            || (ctx->state == DS_KEY && !ctx->key_sz && !ctx->val_sz))
+        {
+            return APR_SUCCESS;
+        }
+
+        return SERF_ERROR_TRUNCATED_STREAM;
+    }
+
+    return status;
+}
+
+static void fcgi_serialize(serf_bucket_t *bucket)
+{
+    fcgi_params_decode_ctx_t *ctx = bucket->data;
+    serf_bucket_t *tmp;
+
+    serf_bucket_aggregate_become(bucket);
+
+    if (ctx->method || ctx->path) {
+        if (ctx->method) {
+            tmp = serf_bucket_simple_own_create(ctx->method, 
strlen(ctx->method),
+                                                bucket->allocator);
+        }
+        else
+            tmp = SERF_BUCKET_SIMPLE_STRING("GET", bucket->allocator);
+        serf_bucket_aggregate_append(bucket, tmp);
+
+        tmp = SERF_BUCKET_SIMPLE_STRING(" ", bucket->allocator);
+        serf_bucket_aggregate_append(bucket, tmp);
+
+        if (ctx->path) {
+            tmp = serf_bucket_simple_own_create(ctx->path, strlen(ctx->path),
+                                                bucket->allocator);
+        }
+        else
+            tmp = SERF_BUCKET_SIMPLE_STRING("/", bucket->allocator);
+        serf_bucket_aggregate_append(bucket, tmp);
+
+        tmp = SERF_BUCKET_SIMPLE_STRING(" HTTP/2.0\r\n", bucket->allocator);
+        serf_bucket_aggregate_append(bucket, tmp);
+    }
+
+    if (ctx->headers)
+        serf_bucket_aggregate_append(bucket, ctx->headers);
+
+    if (ctx->key)
+        serf_bucket_mem_free(bucket->allocator, ctx->key);
+    if (ctx->val)
+        serf_bucket_mem_free(bucket->allocator, ctx->val);
+
+    serf_bucket_mem_free(bucket->allocator, ctx);
+}
+
+static apr_status_t fcgi_params_decode_read(serf_bucket_t *bucket,
+                                            apr_size_t requested,
+                                            const char **data,
+                                            apr_size_t *len)
+{
+    apr_status_t status;
+
+    status = fcgi_params_decode(bucket);
+
+    if (status) {
+        *len = 0;
+        return status;
+    }
+
+    fcgi_serialize(bucket);
+    return bucket->type->read(bucket, requested, data, len);
+}
+
+static apr_status_t fcgi_params_decode_peek(serf_bucket_t *bucket,
+                                            const char **data,
+                                            apr_size_t *len)
+{
+    apr_status_t status;
 
+    status = fcgi_params_decode(bucket);
 
+    if (status) {
+        *len = 0;
+        return status;
+    }
+
+    fcgi_serialize(bucket);
+    return bucket->type->peek(bucket, data, len);
+}
+
+static void fcgi_params_decode_destroy(serf_bucket_t *bucket)
+{
+    fcgi_serialize(bucket);
+
+    bucket->type->destroy(bucket);
+}
+
+
+extern const serf_bucket_type_t serf_bucket_type__fcgi_params_decode =
+{
+    "FCGI-PARAMS_DECODE",
+    fcgi_params_decode_read,
+    serf_default_readline,
+    serf_default_read_iovec,
+    serf_default_read_for_sendfile,
+    serf_buckets_are_v2,
+    fcgi_params_decode_peek,
+    fcgi_params_decode_destroy,
+    serf_default_read_bucket,
+    serf_default_get_remaining,
+    serf_default_ignore_config
+};
 /* ==================================================================== */
 
 serf_bucket_t *
@@ -327,3 +665,4 @@ extern const serf_bucket_type_t serf_buc
     serf_fcgi_frame_get_remaining,
     serf_fcgi_frame_set_config
 };
+

Modified: serf/trunk/protocols/fcgi_buckets.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_buckets.h?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_buckets.h (original)
+++ serf/trunk/protocols/fcgi_buckets.h Tue Nov 17 12:49:11 2015
@@ -66,6 +66,15 @@ apr_status_t serf__bucket_fcgi_unframe_r
                                                  apr_uint16_t *frame_type);
 
 /* ==================================================================== */
+extern const serf_bucket_type_t serf_bucket_type__fcgi_params_decode;
+#define SERF__BUCKET_IS_FCGI_PARAMS_DECODE(b)               \
+            SERF_BUCKET_CHECK((b), _fcgi_params_decode)
+
+serf_bucket_t *
+serf__bucket_fcgi_params_decode_create(serf_bucket_t *stream,
+                                       serf_bucket_alloc_t *alloc);
+
+/* ==================================================================== */
 extern const serf_bucket_type_t serf_bucket_type__fcgi_frame;
 
 #define SERF__BUCKET_IS_FCGI_FRAME(b) SERF_BUCKET_CHECK((b), _fcgi_frame)

Modified: serf/trunk/protocols/fcgi_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Tue Nov 17 12:49:11 2015
@@ -31,6 +31,9 @@
 #include "protocols/fcgi_buckets.h"
 #include "protocols/fcgi_protocol.h"
 
+#define SERF_ERROR_FCGI_RECORD_SIZE_ERROR   SERF_ERROR_HTTP2_FRAME_SIZE_ERROR
+#define SERF_ERROR_FCGI_PROTOCOL_ERROR      SERF_ERROR_HTTP2_PROTOCOL_ERROR
+
 typedef struct serf_fcgi_protocol_t
 {
     serf_context_t *ctx;
@@ -53,6 +56,10 @@ typedef struct serf_fcgi_protocol_t
     serf_bucket_t *read_frame;
     bool in_frame;
 
+    serf_fcgi_stream_t *first, *last;
+
+    bool no_keep_conn;
+
 } serf_fcgi_protocol_t;
 
 static apr_status_t fcgi_cleanup(void *baton)
@@ -64,6 +71,31 @@ static apr_status_t fcgi_cleanup(void *b
     return APR_SUCCESS;
 }
 
+/* Implements serf_bucket_prefix_handler_t.
+   Handles PING frames for pings initiated locally */
+static apr_status_t fcgi_begin_request(void *baton,
+                                       serf_bucket_t *bucket,
+                                       const char *data,
+                                       apr_size_t len)
+{
+    serf_fcgi_stream_t *stream = baton;
+    const FCGI_BeginRequestBody *brb;
+
+    if (len != sizeof(*brb))
+        return SERF_ERROR_FCGI_RECORD_SIZE_ERROR;
+
+    brb = (const void*)data;
+
+    stream->role = (brb->roleB1 << 8) | (brb->roleB0);
+
+    if (!(brb->flags & FCGI_KEEP_CONN))
+        stream->fcgi->no_keep_conn = true;
+
+
+    return APR_SUCCESS;
+}
+
+
 /* Implements the serf_bucket_end_of_frame_t callback */
 static apr_status_t
 fcgi_end_of_frame(void *baton,
@@ -164,7 +196,6 @@ static apr_status_t fcgi_process(serf_fc
             void *process_baton = NULL;
             serf_bucket_t *process_bucket = NULL;
             serf_fcgi_stream_t *stream;
-            apr_uint32_t reset_reason;
 
             status = serf__bucket_fcgi_unframe_read_info(body, &sid,
                                                          &frametype);
@@ -195,6 +226,25 @@ static apr_status_t fcgi_process(serf_fc
             switch (frametype)
             {
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_BEGIN_REQUEST):
+                    stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+
+                    if (stream) {
+                        /* Stream must be new */
+                        return SERF_ERROR_FCGI_PROTOCOL_ERROR;
+                    }
+                    stream = serf_fcgi__stream_get(fcgi, sid, TRUE, TRUE);
+
+                    remaining = (apr_size_t)serf_bucket_get_remaining(body);
+                    if (remaining != sizeof(FCGI_BeginRequestBody)) {
+                        return SERF_ERROR_FCGI_RECORD_SIZE_ERROR;
+                    }
+                    body = serf_bucket_prefix_create(
+                                        body,
+                                        sizeof(FCGI_BeginRequestBody),
+                                        fcgi_begin_request, stream,
+                                        fcgi->allocator);
+
+                    /* Just reading will handle this frame now*/
                     process_bucket = body;
                     break;
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_ABORT_REQUEST):
@@ -204,10 +254,44 @@ static apr_status_t fcgi_process(serf_fc
                     process_bucket = body;
                     break;
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_PARAMS):
-                    process_bucket = body;
+                    stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+                    if (!stream) {
+                        return SERF_ERROR_FCGI_PROTOCOL_ERROR;
+                    }
+
+                    body = serf_fcgi__stream_handle_params(stream, body,
+                                                           fcgi->allocator);
+
+                    if (body) {
+                        /* We will take care of discarding */
+                        process_bucket = body;
+                    }
+                    else
+                    {
+                        /* The stream wants to handle the reading itself */
+                        process_handler = serf_fcgi__stream_processor;
+                        process_baton = stream;
+                    }
                     break;
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDIN):
-                    process_bucket = body;
+                    stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+                    if (!stream) {
+                        return SERF_ERROR_FCGI_PROTOCOL_ERROR;
+                    }
+
+                    body = serf_fcgi__stream_handle_stdin(stream, body,
+                                                          fcgi->allocator);
+
+                    if (body) {
+                        /* We will take care of discarding */
+                        process_bucket = body;
+                    }
+                    else
+                    {
+                        /* The stream wants to handle the reading itself */
+                        process_handler = serf_fcgi__stream_processor;
+                        process_baton = stream;
+                    }
                     break;
                 case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT):
                     process_bucket = body;
@@ -263,6 +347,11 @@ static apr_status_t fcgi_read(serf_fcgi_
 
 static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi)
 {
+    return APR_SUCCESS;
+}
+
+static apr_status_t fcgi_hangup(serf_fcgi_protocol_t *fcgi)
+{
     return APR_ENOTIMPL;
 }
 
@@ -271,6 +360,52 @@ static apr_status_t fcgi_teardown(serf_f
     return APR_ENOTIMPL;
 }
 
+static void
+move_to_head(serf_fcgi_stream_t *stream)
+{
+    /* Not implemented yet */
+}
+
+serf_fcgi_stream_t *
+serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
+                      apr_uint16_t streamid,
+                      bool create,
+                      bool move_first)
+{
+    serf_fcgi_stream_t *stream;
+
+    if (streamid == 0)
+        return NULL;
+
+    for (stream = fcgi->first; stream; stream = stream->next)
+    {
+        if (stream->streamid == streamid)
+        {
+            if (move_first && stream != fcgi->first)
+                move_to_head(stream);
+
+            return stream;
+        }
+    }
+
+    if (create)
+    {
+        stream = serf_fcgi__stream_create(fcgi, streamid, fcgi->allocator);
+
+        if (fcgi->first)
+        {
+            stream->next = fcgi->first;
+            fcgi->first->prev = stream;
+            fcgi->first = stream;
+        }
+        else
+            fcgi->last = fcgi->first = stream;
+
+        return stream;
+    }
+    return NULL;
+}
+
 
 /* --------------- connection support --------------- */
 static apr_status_t fcgi_outgoing_read(serf_connection_t *conn)
@@ -287,21 +422,21 @@ static apr_status_t fcgi_outgoing_write(
 {
     serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
 
-    return fcgi_read(fcgi);
+    return fcgi_teardown(fcgi);
 }
 
 static apr_status_t fcgi_outgoing_hangup(serf_connection_t *conn)
 {
     serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
 
-    return fcgi_read(fcgi);
+    return fcgi_teardown(fcgi);
 }
 
 static apr_status_t fcgi_outgoing_teardown(serf_connection_t *conn)
 {
     serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
 
-    return fcgi_read(fcgi);
+    return fcgi_teardown(fcgi);
 }
 
 void serf__fcgi_protocol_init(serf_connection_t *conn)
@@ -353,21 +488,26 @@ static apr_status_t fcgi_server_write(se
 {
     serf_fcgi_protocol_t *fcgi = client->protocol_baton;
 
-    return fcgi_read(fcgi);
+    if (!fcgi->stream) {
+        fcgi->stream = client->stream;
+        fcgi->ostream = client->ostream_tail;
+    }
+
+    return fcgi_write(fcgi);
 }
 
 static apr_status_t fcgi_server_hangup(serf_incoming_t *client)
 {
     serf_fcgi_protocol_t *fcgi = client->protocol_baton;
 
-    return fcgi_read(fcgi);
+    return fcgi_hangup(fcgi);
 }
 
 static apr_status_t fcgi_server_teardown(serf_incoming_t *client)
 {
     serf_fcgi_protocol_t *fcgi = client->protocol_baton;
 
-    return fcgi_read(fcgi);
+    return fcgi_teardown(fcgi);
 }
 
 void serf__fcgi_protocol_init_server(serf_incoming_t *client)

Modified: serf/trunk/protocols/fcgi_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Tue Nov 17 12:49:11 2015
@@ -46,6 +46,37 @@ typedef struct serf_fcgi_stream_data_t s
 
 #define FCGI_V1     0x1
 
+/* From protocol specs */
+/*
+* Listening socket file number
+*/
+#define FCGI_LISTENSOCK_FILENO 0
+
+typedef struct FCGI_Header {
+    unsigned char version;
+    unsigned char type;
+    unsigned char requestIdB1;
+    unsigned char requestIdB0;
+    unsigned char contentLengthB1;
+    unsigned char contentLengthB0;
+    unsigned char paddingLength;
+    unsigned char reserved;
+} FCGI_Header;
+
+/*
+ * Number of bytes in a FCGI_Header.  Future versions of the protocol
+ * will not reduce this number.
+ */
+#define FCGI_HEADER_LEN  8
+
+/*
+ * Value for version component of FCGI_Header
+ */
+#define FCGI_VERSION_1           1
+
+/*
+ * Values for type component of FCGI_Header
+ */
 #define FCGI_BEGIN_REQUEST       1
 #define FCGI_ABORT_REQUEST       2
 #define FCGI_END_REQUEST         3
@@ -59,12 +90,85 @@ typedef struct serf_fcgi_stream_data_t s
 #define FCGI_UNKNOWN_TYPE       11
 #define FCGI_MAXTYPE (FCGI_UNKNOWN_TYPE)
 
+/*
+* Value for requestId component of FCGI_Header
+*/
+#define FCGI_NULL_REQUEST_ID     0
+
+typedef struct FCGI_BeginRequestBody {
+    unsigned char roleB1;
+    unsigned char roleB0;
+    unsigned char flags;
+    unsigned char reserved[5];
+} FCGI_BeginRequestBody;
+
+typedef struct FCGI_BeginRequestRecord {
+    FCGI_Header header;
+    FCGI_BeginRequestBody body;
+} FCGI_BeginRequestRecord;
+
+/*
+ * Mask for flags component of FCGI_BeginRequestBody
+ */
+#define FCGI_KEEP_CONN  1
+
+/*
+ * Values for role component of FCGI_BeginRequestBody
+ */
+#define FCGI_RESPONDER  1
+#define FCGI_AUTHORIZER 2
+#define FCGI_FILTER     3
+
+typedef struct FCGI_EndRequestBody {
+    unsigned char appStatusB3;
+    unsigned char appStatusB2;
+    unsigned char appStatusB1;
+    unsigned char appStatusB0;
+    unsigned char protocolStatus;
+    unsigned char reserved[3];
+} FCGI_EndRequestBody;
+
+typedef struct FCGI_EndRequestRecord {
+    FCGI_Header header;
+    FCGI_EndRequestBody body;
+} FCGI_EndRequestRecord;
+
+/*
+* Values for protocolStatus component of FCGI_EndRequestBody
+*/
+#define FCGI_REQUEST_COMPLETE 0
+#define FCGI_CANT_MPX_CONN    1
+#define FCGI_OVERLOADED       2
+#define FCGI_UNKNOWN_ROLE     3
+
+/*
+* Variable names for FCGI_GET_VALUES / FCGI_GET_VALUES_RESULT records
+*/
+#define FCGI_MAX_CONNS  "FCGI_MAX_CONNS"
+#define FCGI_MAX_REQS   "FCGI_MAX_REQS"
+#define FCGI_MPXS_CONNS "FCGI_MPXS_CONNS"
+
+typedef struct FCGI_UnknownTypeBody {
+    unsigned char type;
+    unsigned char reserved[7];
+} FCGI_UnknownTypeBody;
+
+typedef struct FCGI_UnknownTypeRecord {
+    FCGI_Header header;
+    FCGI_UnknownTypeBody body;
+} FCGI_UnknownTypeRecord;
+
+
+/**************************************************/
 
 typedef struct serf_fcgi_stream_t
 {
-    struct serf_fcgi_protocol_t *h2;
+    struct serf_fcgi_protocol_t *fcgi;
     serf_bucket_alloc_t *alloc;
 
+    apr_uint16_t streamid;
+    apr_uint16_t role;
+
     /* Opaque implementation details */
     serf_fcgi_stream_data_t *data;
 
@@ -78,6 +182,31 @@ typedef apr_status_t(*serf_fcgi_processo
                                              serf_bucket_t *body);
 
 
+/* From fcgi_protocol.c */
+serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
+                                           apr_uint16_t streamid,
+                                           bool create,
+                                           bool move_first);
+
+
+/* From fcgi_stream.c */
+serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
+                                              apr_uint16_t streamid,
+                                              serf_bucket_alloc_t *alloc);
+
+apr_status_t serf_fcgi__stream_processor(void *baton,
+                                         serf_fcgi_protocol_t *fcgi,
+                                         serf_bucket_t *body);
+
+serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream,
+                                                serf_bucket_t *body,
+                                                serf_bucket_alloc_t *alloc);
+
+serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream,
+                                               serf_bucket_t *body,
+                                               serf_bucket_alloc_t *alloc);
+
+
 
 #ifdef __cplusplus
 }

Modified: serf/trunk/protocols/fcgi_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_stream.c (original)
+++ serf/trunk/protocols/fcgi_stream.c Tue Nov 17 12:49:11 2015
@@ -32,26 +32,167 @@
 
 struct serf_fcgi_stream_data_t
 {
-    int dummy;
+    serf_bucket_t *req_agg;
+    bool headers_eof;
+    bool stdin_eof;
+
+    serf_request_t *request;
+    serf_incoming_request_t *in_request;
 };
 
-serf_fcgi_stream_t *
-serf_fcgi__stream_create(serf_fcgi_protocol_t *h2,
-                          apr_int32_t streamid,
-                          apr_uint32_t lr_window,
-                          apr_uint32_t rl_window,
-                          serf_bucket_alloc_t *alloc)
+serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
+                                              apr_uint16_t streamid,
+                                              serf_bucket_alloc_t *alloc)
 {
     serf_fcgi_stream_t *stream = serf_bucket_mem_alloc(alloc,
                                                         sizeof(*stream));
 
-    stream->h2 = h2;
+    stream->fcgi = fcgi;
     stream->alloc = alloc;
+    stream->streamid = streamid;
 
     stream->next = stream->prev = NULL;
 
     /* Delay creating this? */
-    stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data));
+    stream->data = serf_bucket_mem_calloc(alloc, sizeof(*stream->data));
 
     return stream;
 }
+
+/* Aggregate hold open callback for what requests will think is the
+   actual body */
+static apr_status_t stream_agg_eof(void *baton,
+                                   serf_bucket_t *bucket)
+{
+    serf_fcgi_stream_t *stream = baton;
+
+    if (!stream->data->stdin_eof)
+        return APR_EAGAIN;
+
+    return APR_EOF;
+}
+
+serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream,
+                                                serf_bucket_t *body,
+                                                serf_bucket_alloc_t *alloc)
+{
+    apr_size_t remaining;
+    if (!stream->data->req_agg) {
+        stream->data->req_agg = serf_bucket_aggregate_create(stream->alloc);
+
+        serf_bucket_aggregate_hold_open(stream->data->req_agg,
+                                        stream_agg_eof, stream);
+    }
+
+    remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+    if (remaining) {
+        body = serf__bucket_fcgi_params_decode_create(body, body->allocator);
+    }
+    else {
+        stream->data->headers_eof = true;
+    }
+
+    /* And add it to our aggregate in both cases */
+    serf_bucket_aggregate_append(stream->data->req_agg, body);
+
+    return NULL;
+}
+
+serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream,
+                                               serf_bucket_t *body,
+                                               serf_bucket_alloc_t *alloc)
+{
+    apr_size_t remaining;
+    SERF_FCGI_assert(stream->data->headers_eof);
+    if (!stream->data->req_agg) {
+        stream->data->req_agg = serf_bucket_aggregate_create(stream->alloc);
+
+        serf_bucket_aggregate_hold_open(stream->data->req_agg,
+                                        stream_agg_eof, stream);
+    }
+
+    remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+    if (!remaining) {
+        stream->data->stdin_eof = true;
+    }
+
+    /* And add it to our aggregate in both cases */
+    serf_bucket_aggregate_append(stream->data->req_agg, body);
+
+    return NULL;
+}
+
+
+apr_status_t serf_fcgi__stream_processor(void *baton,
+                                         serf_fcgi_protocol_t *fcgi,
+                                         serf_bucket_t *body)
+{
+    serf_fcgi_stream_t *stream = baton;
+    apr_status_t status = APR_SUCCESS;
+
+    SERF_FCGI_assert(stream->data->req_agg != NULL);
+
+    if (stream->data->request) {
+        /* ### TODO */
+    }
+    else if (stream->data->in_request) {
+        serf_incoming_request_t *request = stream->data->in_request;
+
+        SERF_FCGI_assert(request->req_bkt != NULL);
+
+        status = request->handler(request, request->req_bkt,
+                                  request->handler_baton,
+                                  request->pool);
+
+        if (!APR_STATUS_IS_EOF(status)
+            && !SERF_BUCKET_READ_ERROR(status))
+            return status;
+
+        if (APR_STATUS_IS_EOF(status)) {
+            status = serf_incoming_response_create(request);
+
+            if (status)
+                return status;
+        }
+
+        if (SERF_BUCKET_READ_ERROR(status)) {
+
+            /* SEND ERROR status */
+
+            return status;
+        }
+    }
+
+    while (!status)
+    {
+        struct iovec vecs[IOV_MAX];
+        int vecs_used;
+
+        /* Drain the bucket as efficiently as possible */
+        status = serf_bucket_read_iovec(stream->data->req_agg,
+                                        SERF_READ_ALL_AVAIL,
+                                        IOV_MAX, vecs, &vecs_used);
+
+        if (vecs_used) {
+            /* We have data... What should we do with it? */
+            /*int i;
+
+            for (i = 0; i < vecs_used; i++)
+                fprintf(stderr, "%.*s", vecs[i].iov_len, vecs[i].iov_base);*/
+        }
+    }
+
+    if (APR_STATUS_IS_EOF(status))
+    {
+        /* If there was a request, it is already gone, so we can now safely
+        destroy our aggregate which may include everything upto the http2
+        frames */
+        serf_bucket_destroy(stream->data->req_agg);
+        stream->data->req_agg = NULL;
+    }
+
+    return status;
+}
+


Reply via email to