Author: rhuijben
Date: Tue Nov 17 16:07:48 2015
New Revision: 1714818

URL: http://svn.apache.org/viewvc?rev=1714818&view=rev
Log:
Start sending a response back to the fgci server. This will actually make
the result visible in a webbrowser when using a httpd configured with

ProxyPass "/d/fcgi/" "fcgi://127.0.0.1:4000/" enablereuse=on

And a serf_httpd run as
$ serf_httpd --listen fcgi,172.24.0.11:4000 /my/root

* buckets/fcgi_buckets.c
  (fcgi_frame_ctx_t): New struct.
  (serf__bucket_fcgi_frame_create): Extend arguments.
  (serf_fcgi_frame_refill): New function.
  (serf_fcgi_frame_read,
   serf_fcgi_frame_read_iovec,
   serf_fcgi_frame_peek,
   serf_fcgi_frame_destroy,
   serf_fcgi_frame_get_remaining,
   serf_fcgi_frame_set_config): Implement.

* protocols/fcgi_buckets.h
  (serf__bucket_fcgi_frame_create): Add two arguments.

* protocols/fcgi_protocol.c
  (serf_fcgi__enqueue_frame): New function. Based on http2 version.
  (fcgi_write): Implement.

* protocols/fcgi_protocol.h
  (serf_fcgi__enqueue_frame): New function.

* protocols/fcgi_stream.c
  (includes): Add apr_date.h.
  (fcgi_stream_enqueue_response): Create proper response.

* test/serf_httpd.c
  (request_generate_response): Remove chunked encoding from http/1 testing.
    This doesn't belong on this layer.

Modified:
    serf/trunk/buckets/fcgi_buckets.c
    serf/trunk/protocols/fcgi_buckets.h
    serf/trunk/protocols/fcgi_protocol.c
    serf/trunk/protocols/fcgi_protocol.h
    serf/trunk/protocols/fcgi_stream.c
    serf/trunk/test/serf_httpd.c

Modified: serf/trunk/buckets/fcgi_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/fcgi_buckets.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/buckets/fcgi_buckets.c (original)
+++ serf/trunk/buckets/fcgi_buckets.c Tue Nov 17 16:07:48 2015
@@ -600,23 +600,165 @@ extern const serf_bucket_type_t serf_buc
     serf_default_ignore_config
 };
 /* ==================================================================== */
+typedef struct fcgi_frame_ctx_t
+{
+    serf_bucket_t *stream;
+    serf_bucket_t *agg;
+    apr_uint16_t stream_id;
+    apr_uint16_t frame_type;
+    bool send_stream;
+    bool send_eof;
+    bool at_eof;
+
+    char record_data[FCGI_RECORD_SIZE];
+} fcgi_frame_ctx_t;
+
 
 serf_bucket_t *
 serf__bucket_fcgi_frame_create(serf_bucket_t *stream,
                                apr_uint16_t stream_id,
                                apr_uint16_t frame_type,
+                               bool send_as_stream,
+                               bool send_eof,
                                serf_bucket_alloc_t *alloc)
 {
-    return NULL;
+    fcgi_frame_ctx_t *ctx;
+
+    ctx = serf_bucket_mem_alloc(alloc, sizeof(*ctx));
+    ctx->stream = stream;
+    ctx->stream_id = stream_id;
+    ctx->frame_type = frame_type;
+    ctx->send_stream = send_as_stream;
+    ctx->send_eof = send_eof;
+    ctx->at_eof = false;
+    ctx->agg = NULL;
+
+    return serf_bucket_create(&serf_bucket_type__fcgi_frame, alloc, ctx);
 }
 
+static apr_status_t serf_fcgi_frame_refill(serf_bucket_t *bucket)
+{
+    fcgi_frame_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+    serf_bucket_t *head = NULL;
+    apr_size_t payload;
+
+    if (ctx->at_eof)
+        return APR_EOF;
+
+    if (!ctx->agg)
+        ctx->agg = serf_bucket_aggregate_create(bucket->allocator);
+
+    if (!ctx->stream)
+    {
+        payload = 0;
+    }
+    else if (ctx->send_stream)
+    {
+        apr_uint64_t remaining;
+        serf_bucket_split_create(&head, &ctx->stream, ctx->stream,
+                                 8192, 0xFFF0); /* Some guesses */
+
+        remaining = serf_bucket_get_remaining(head);
+        if (remaining != SERF_LENGTH_UNKNOWN) {
+            serf_bucket_aggregate_append(ctx->agg, head);
+            payload = (apr_size_t)remaining;
+        }
+        else if (remaining == 0)
+        {
+            payload = 0;
+            serf_bucket_destroy(head);
+            ctx->at_eof = true;
+        }
+        else
+        {
+            struct iovec vecs[IOV_MAX];
+            int vecs_used;
+
+            status = serf_bucket_read_iovec(head, 0xFFF0,
+                                            IOV_MAX, vecs, &vecs_used);
+
+            if (SERF_BUCKET_READ_ERROR(status))
+                return status;
+
+            if (vecs_used) {
+                serf_bucket_aggregate_append_iovec(ctx->agg, vecs, vecs_used);
+                payload = (apr_size_t)serf_bucket_get_remaining(ctx->agg);
+
+                if (APR_STATUS_IS_EOF(status)) {
+                    /* Keep head alive by appending it to the aggregate */
+                    serf_bucket_aggregate_append(ctx->agg, head);
+                }
+                else {
+                    /* Write a new record for the remaining part of head :( */
+                    serf_bucket_aggregate_append(ctx->agg,
+                                serf__bucket_fcgi_frame_create(head, 
ctx->stream_id,
+                                                            ctx->frame_type,
+                                                            true /* 
send_as_stream */,
+                                                            false /* send_eof 
*/,
+                                                            
bucket->allocator));
+                }
+            }
+            else
+                payload = 0;
+
+            if (APR_STATUS_IS_EOF(status))
+                ctx->at_eof = true;
+        }
+
+        if (!payload && !ctx->send_eof)
+            return APR_SUCCESS;
+    }
+    else
+    {
+        abort(); /* ### TODO */
+    }
+
+    /* Create FCGI record */
+    ctx->record_data[0] = (ctx->frame_type >> 8);
+    ctx->record_data[1] = (ctx->frame_type & 0xFF);
+    ctx->record_data[2] = (ctx->stream_id >> 8);
+    ctx->record_data[3] = (ctx->stream_id & 0xFF);
+    ctx->record_data[4] = (payload >> 8) & 0xFF;
+    ctx->record_data[5] = (payload & 0xFF);
+    ctx->record_data[6] = 0; /* padding */
+    ctx->record_data[7] = 0; /* reserved */
+
+    serf_bucket_aggregate_prepend(ctx->agg,
+                                  serf_bucket_simple_create(ctx->record_data,
+                                                            FCGI_RECORD_SIZE,
+                                                            NULL, NULL,
+                                                            
bucket->allocator));
+    return APR_SUCCESS;
+}
 
 static apr_status_t serf_fcgi_frame_read(serf_bucket_t *bucket,
                                          apr_size_t requested,
                                          const char **data,
                                          apr_size_t *len)
 {
-    return APR_ENOTIMPL;
+    fcgi_frame_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+
+    if (ctx->agg) {
+        status = serf_bucket_read(ctx->agg, requested, data, len);
+
+        if (!APR_STATUS_IS_EOF(status))
+            return status;
+    }
+
+    status = serf_fcgi_frame_refill(bucket);
+    if (status) {
+        *len = 0;
+        return status;
+    }
+
+    status = serf_bucket_read(ctx->agg, requested, data, len);
+
+    if (APR_STATUS_IS_EOF(status) && !ctx->at_eof)
+        status = APR_SUCCESS;
+
+    return status;
 }
 
 static apr_status_t serf_fcgi_frame_read_iovec(serf_bucket_t *bucket,
@@ -625,30 +767,89 @@ static apr_status_t serf_fcgi_frame_read
                                                struct iovec *vecs,
                                                int *vecs_used)
 {
-    return APR_ENOTIMPL;
+    fcgi_frame_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+
+    if (ctx->agg) {
+        status = serf_bucket_read_iovec(ctx->agg, requested, vecs_size,
+                                        vecs, vecs_used);
+
+        if (!APR_STATUS_IS_EOF(status))
+            return status;
+    }
+
+    status = serf_fcgi_frame_refill(bucket);
+    if (status) {
+        *vecs_used = 0;
+        return status;
+    }
+
+    status = serf_bucket_read_iovec(ctx->agg, requested, vecs_size,
+                                        vecs, vecs_used);
+
+    if (APR_STATUS_IS_EOF(status) && !ctx->at_eof)
+        status = APR_SUCCESS;
+
+    return status;
 }
 
 static apr_status_t serf_fcgi_frame_peek(serf_bucket_t *bucket,
                                          const char **data,
                                          apr_size_t *len)
 {
-    return APR_ENOTIMPL;
+    fcgi_frame_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+
+    if (ctx->agg) {
+        status = serf_bucket_peek(ctx->agg, data, len);
+
+        if (!APR_STATUS_IS_EOF(status))
+            return status;
+    }
+
+    status = serf_fcgi_frame_refill(bucket);
+    if (status) {
+        *len = 0;
+        return status;
+    }
+
+    status = serf_bucket_peek(ctx->agg, data, len);
+
+    if (APR_STATUS_IS_EOF(status) && !ctx->at_eof)
+        status = APR_SUCCESS;
+
+    return status;
 }
 
 static void serf_fcgi_frame_destroy(serf_bucket_t *bucket)
 {
+    fcgi_frame_ctx_t *ctx = bucket->data;
+
+    if (ctx->agg)
+        serf_bucket_destroy(ctx->agg);
+    if (ctx->stream)
+        serf_bucket_destroy(ctx->stream);
+
     serf_default_destroy_and_data(bucket);
 }
 
 static apr_uint64_t serf_fcgi_frame_get_remaining(serf_bucket_t *bucket)
 {
-    return APR_ENOTIMPL;
+    return SERF_LENGTH_UNKNOWN;
 }
 
 static apr_status_t serf_fcgi_frame_set_config(serf_bucket_t *bucket,
-                                                 serf_config_t *config)
+                                               serf_config_t *config)
 {
-    return APR_ENOTIMPL;
+    fcgi_frame_ctx_t *ctx = bucket->data;
+
+    if (!ctx->agg)
+        ctx->agg = serf_bucket_aggregate_create(bucket->allocator);
+    serf_bucket_set_config(ctx->agg, config);
+    if (ctx->stream)
+        serf_bucket_set_config(ctx->stream, config);
+
+    return APR_SUCCESS;
 }
 
 extern const serf_bucket_type_t serf_bucket_type__fcgi_frame =

Modified: serf/trunk/protocols/fcgi_buckets.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_buckets.h?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_buckets.h (original)
+++ serf/trunk/protocols/fcgi_buckets.h Tue Nov 17 16:07:48 2015
@@ -83,6 +83,8 @@ serf_bucket_t *
 serf__bucket_fcgi_frame_create(serf_bucket_t *stream,
                                apr_uint16_t stream_id,
                                apr_uint16_t frame_type,
+                               bool send_as_stream,
+                               bool send_eof,
                                serf_bucket_alloc_t *alloc);
 
 #ifdef __cplusplus

Modified: serf/trunk/protocols/fcgi_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Tue Nov 17 16:07:48 2015
@@ -347,8 +347,76 @@ static apr_status_t fcgi_read(serf_fcgi_
     return APR_SUCCESS;
 }
 
+apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
+                                      serf_bucket_t *frame,
+                                      bool pump)
+{
+    apr_status_t status;
+    bool want_write;
+
+    if (!pump && !*fcgi->dirty_pollset)
+    {
+        const char *data;
+        apr_size_t len;
+
+        /* Cheap check to see if we should request a write
+        event next time around */
+        status = serf_bucket_peek(fcgi->ostream, &data, &len);
+
+        if (SERF_BUCKET_READ_ERROR(status))
+        {
+            serf_bucket_destroy(frame);
+            return status;
+        }
+
+        if (len == 0)
+        {
+            *fcgi->dirty_pollset = true;
+            fcgi->ctx->dirty_pollset = true;
+        }
+    }
+
+    serf_bucket_aggregate_append(fcgi->ostream, frame);
+
+    if (!pump)
+        return APR_SUCCESS;
+
+    /* Flush final output buffer (after ssl, etc.) */
+    if (fcgi->conn)
+        status = serf__connection_flush(fcgi->conn, TRUE);
+    else
+        status = serf__incoming_client_flush(fcgi->client, TRUE);
+
+    want_write = APR_STATUS_IS_EAGAIN(status);
+
+    if ((want_write && !(*fcgi->req_events & APR_POLLOUT))
+        || (!want_write && (*fcgi->req_events & APR_POLLOUT)))
+    {
+        *fcgi->dirty_pollset = true;
+        fcgi->ctx->dirty_pollset = true;
+    }
+
+    return status;
+}
+
 static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi)
 {
+    apr_status_t status;
+
+    if (fcgi->client)
+        status = serf__incoming_client_flush(fcgi->client, true);
+    else
+        status = serf__connection_flush(fcgi->conn, true);
+
+    if (APR_STATUS_IS_EAGAIN(status))
+        return APR_SUCCESS;
+    else if (status)
+        return status;
+
+    /* Probably nothing to write. */
+    *fcgi->dirty_pollset = true;
+    fcgi->ctx->dirty_pollset = true;
+
     return APR_SUCCESS;
 }
 

Modified: serf/trunk/protocols/fcgi_protocol.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Tue Nov 17 16:07:48 2015
@@ -195,6 +195,10 @@ apr_status_t serf_fcgi__setup_incoming_r
     void **req_setup_baton,
     serf_fcgi_protocol_t *fcgi);
 
+apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
+                                      serf_bucket_t *frame,
+                                      bool pump);
+
 
 /* From fcgi_stream.c */
 serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,

Modified: serf/trunk/protocols/fcgi_stream.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_stream.c (original)
+++ serf/trunk/protocols/fcgi_stream.c Tue Nov 17 16:07:48 2015
@@ -22,6 +22,7 @@
 
 #include <apr_pools.h>
 #include <apr_strings.h>
+#include <apr_date.h>
 
 #include "serf.h"
 #include "serf_bucket_util.h"
@@ -77,12 +78,59 @@ fcgi_stream_enqueue_response(serf_incomi
                              void *enqueue_baton,
                              serf_bucket_t *response_bkt)
 {
-    return APR_SUCCESS;
+    serf_fcgi_stream_t *stream = enqueue_baton;
+    serf_bucket_alloc_t *alloc = response_bkt->allocator;
+    serf_linebuf_t *linebuf;
+    serf_bucket_t *agg;
+    serf_bucket_t *tmp;
+    apr_status_t status;
+
+    /* With FCGI we don't send the usual first line of the response.
+       We just send a "Status: 200" instead and the actual http
+       server will handle the rest */
+    agg = serf_bucket_aggregate_create(alloc);
+
+    /* Too big for the stack :( */
+    linebuf = serf_bucket_mem_alloc(alloc, sizeof(*linebuf));
+    serf_linebuf_init(linebuf);
+
+    do
+    {
+        status = serf_linebuf_fetch(linebuf, response_bkt, SERF_NEWLINE_ANY);
+    } while (status == APR_SUCCESS && linebuf->state != SERF_LINEBUF_READY);
+
+    if (status
+        || linebuf->state != SERF_LINEBUF_READY
+        || !apr_date_checkmask(linebuf->line, "HTTP/#.# ###*"))
+    {
+        /* We can't write a response in this state yet :( */
+        serf_bucket_mem_free(alloc, linebuf);
+        return status;
+    }
+
+    tmp = SERF_BUCKET_SIMPLE_STRING("Status: ", alloc);
+    serf_bucket_aggregate_append(agg, tmp);
+
+    tmp = serf_bucket_simple_copy_create(linebuf->line + 9, 3, alloc);
+    serf_bucket_aggregate_append(agg, tmp);
+    serf_bucket_mem_free(alloc, linebuf);
+
+    tmp = SERF_BUCKET_SIMPLE_STRING("\r\n", alloc);
+    serf_bucket_aggregate_append(agg, tmp);
+
+    serf_bucket_aggregate_append(agg, response_bkt);
+
+    return serf_fcgi__enqueue_frame(
+        stream->fcgi,
+        serf__bucket_fcgi_frame_create(agg, stream->streamid,
+                                       FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT),
+                                       true, true,
+                                       alloc), true);
 }
 
 static apr_status_t
 stream_setup_request(serf_fcgi_stream_t *stream,
-                      serf_config_t *config)
+                     serf_config_t *config)
 {
     serf_bucket_t *agg;
     apr_status_t status;

Modified: serf/trunk/test/serf_httpd.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/test/serf_httpd.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/test/serf_httpd.c (original)
+++ serf/trunk/test/serf_httpd.c Tue Nov 17 16:07:48 2015
@@ -141,9 +141,6 @@ static apr_status_t request_generate_res
     tmp = SERF_BUCKET_SIMPLE_STRING("HTTP/1.1 200 OK" CRLF, alloc);
     serf_bucket_aggregate_append(agg, tmp);
 
-    tmp = SERF_BUCKET_SIMPLE_STRING("Transfer-Encoding: chunked" CRLF, alloc);
-    serf_bucket_aggregate_append(agg, tmp);
-
     tmp = SERF_BUCKET_SIMPLE_STRING("Content-Type: text/plain" CRLF, alloc);
     serf_bucket_aggregate_append(agg, tmp);
 
@@ -173,7 +170,7 @@ static apr_status_t request_generate_res
     tmp = SERF_BUCKET_SIMPLE_STRING(CRLF, alloc);
     serf_bucket_aggregate_append(body, tmp);
 
-    tmp = serf_bucket_chunk_create(body, alloc);
+    tmp = body;
     serf_bucket_aggregate_append(agg, tmp);
 
     tmp = SERF_BUCKET_SIMPLE_STRING(CRLF, alloc);


Reply via email to