Author: rhuijben
Date: Tue Nov 17 16:07:48 2015
New Revision: 1714818
URL: http://svn.apache.org/viewvc?rev=1714818&view=rev
Log:
Start sending a response back to the fgci server. This will actually make
the result visible in a webbrowser when using a httpd configured with
ProxyPass "/d/fcgi/" "fcgi://127.0.0.1:4000/" enablereuse=on
And a serf_httpd run as
$ serf_httpd --listen fcgi,172.24.0.11:4000 /my/root
* buckets/fcgi_buckets.c
(fcgi_frame_ctx_t): New struct.
(serf__bucket_fcgi_frame_create): Extend arguments.
(serf_fcgi_frame_refill): New function.
(serf_fcgi_frame_read,
serf_fcgi_frame_read_iovec,
serf_fcgi_frame_peek,
serf_fcgi_frame_destroy,
serf_fcgi_frame_get_remaining,
serf_fcgi_frame_set_config): Implement.
* protocols/fcgi_buckets.h
(serf__bucket_fcgi_frame_create): Add two arguments.
* protocols/fcgi_protocol.c
(serf_fcgi__enqueue_frame): New function. Based on http2 version.
(fcgi_write): Implement.
* protocols/fcgi_protocol.h
(serf_fcgi__enqueue_frame): New function.
* protocols/fcgi_stream.c
(includes): Add apr_date.h.
(fcgi_stream_enqueue_response): Create proper response.
* test/serf_httpd.c
(request_generate_response): Remove chunked encoding from http/1 testing.
This doesn't belong on this layer.
Modified:
serf/trunk/buckets/fcgi_buckets.c
serf/trunk/protocols/fcgi_buckets.h
serf/trunk/protocols/fcgi_protocol.c
serf/trunk/protocols/fcgi_protocol.h
serf/trunk/protocols/fcgi_stream.c
serf/trunk/test/serf_httpd.c
Modified: serf/trunk/buckets/fcgi_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/buckets/fcgi_buckets.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/buckets/fcgi_buckets.c (original)
+++ serf/trunk/buckets/fcgi_buckets.c Tue Nov 17 16:07:48 2015
@@ -600,23 +600,165 @@ extern const serf_bucket_type_t serf_buc
serf_default_ignore_config
};
/* ==================================================================== */
+typedef struct fcgi_frame_ctx_t
+{
+ serf_bucket_t *stream;
+ serf_bucket_t *agg;
+ apr_uint16_t stream_id;
+ apr_uint16_t frame_type;
+ bool send_stream;
+ bool send_eof;
+ bool at_eof;
+
+ char record_data[FCGI_RECORD_SIZE];
+} fcgi_frame_ctx_t;
+
serf_bucket_t *
serf__bucket_fcgi_frame_create(serf_bucket_t *stream,
apr_uint16_t stream_id,
apr_uint16_t frame_type,
+ bool send_as_stream,
+ bool send_eof,
serf_bucket_alloc_t *alloc)
{
- return NULL;
+ fcgi_frame_ctx_t *ctx;
+
+ ctx = serf_bucket_mem_alloc(alloc, sizeof(*ctx));
+ ctx->stream = stream;
+ ctx->stream_id = stream_id;
+ ctx->frame_type = frame_type;
+ ctx->send_stream = send_as_stream;
+ ctx->send_eof = send_eof;
+ ctx->at_eof = false;
+ ctx->agg = NULL;
+
+ return serf_bucket_create(&serf_bucket_type__fcgi_frame, alloc, ctx);
}
+static apr_status_t serf_fcgi_frame_refill(serf_bucket_t *bucket)
+{
+ fcgi_frame_ctx_t *ctx = bucket->data;
+ apr_status_t status;
+ serf_bucket_t *head = NULL;
+ apr_size_t payload;
+
+ if (ctx->at_eof)
+ return APR_EOF;
+
+ if (!ctx->agg)
+ ctx->agg = serf_bucket_aggregate_create(bucket->allocator);
+
+ if (!ctx->stream)
+ {
+ payload = 0;
+ }
+ else if (ctx->send_stream)
+ {
+ apr_uint64_t remaining;
+ serf_bucket_split_create(&head, &ctx->stream, ctx->stream,
+ 8192, 0xFFF0); /* Some guesses */
+
+ remaining = serf_bucket_get_remaining(head);
+ if (remaining != SERF_LENGTH_UNKNOWN) {
+ serf_bucket_aggregate_append(ctx->agg, head);
+ payload = (apr_size_t)remaining;
+ }
+ else if (remaining == 0)
+ {
+ payload = 0;
+ serf_bucket_destroy(head);
+ ctx->at_eof = true;
+ }
+ else
+ {
+ struct iovec vecs[IOV_MAX];
+ int vecs_used;
+
+ status = serf_bucket_read_iovec(head, 0xFFF0,
+ IOV_MAX, vecs, &vecs_used);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ if (vecs_used) {
+ serf_bucket_aggregate_append_iovec(ctx->agg, vecs, vecs_used);
+ payload = (apr_size_t)serf_bucket_get_remaining(ctx->agg);
+
+ if (APR_STATUS_IS_EOF(status)) {
+ /* Keep head alive by appending it to the aggregate */
+ serf_bucket_aggregate_append(ctx->agg, head);
+ }
+ else {
+ /* Write a new record for the remaining part of head :( */
+ serf_bucket_aggregate_append(ctx->agg,
+ serf__bucket_fcgi_frame_create(head,
ctx->stream_id,
+ ctx->frame_type,
+ true /*
send_as_stream */,
+ false /* send_eof
*/,
+
bucket->allocator));
+ }
+ }
+ else
+ payload = 0;
+
+ if (APR_STATUS_IS_EOF(status))
+ ctx->at_eof = true;
+ }
+
+ if (!payload && !ctx->send_eof)
+ return APR_SUCCESS;
+ }
+ else
+ {
+ abort(); /* ### TODO */
+ }
+
+ /* Create FCGI record */
+ ctx->record_data[0] = (ctx->frame_type >> 8);
+ ctx->record_data[1] = (ctx->frame_type & 0xFF);
+ ctx->record_data[2] = (ctx->stream_id >> 8);
+ ctx->record_data[3] = (ctx->stream_id & 0xFF);
+ ctx->record_data[4] = (payload >> 8) & 0xFF;
+ ctx->record_data[5] = (payload & 0xFF);
+ ctx->record_data[6] = 0; /* padding */
+ ctx->record_data[7] = 0; /* reserved */
+
+ serf_bucket_aggregate_prepend(ctx->agg,
+ serf_bucket_simple_create(ctx->record_data,
+ FCGI_RECORD_SIZE,
+ NULL, NULL,
+
bucket->allocator));
+ return APR_SUCCESS;
+}
static apr_status_t serf_fcgi_frame_read(serf_bucket_t *bucket,
apr_size_t requested,
const char **data,
apr_size_t *len)
{
- return APR_ENOTIMPL;
+ fcgi_frame_ctx_t *ctx = bucket->data;
+ apr_status_t status;
+
+ if (ctx->agg) {
+ status = serf_bucket_read(ctx->agg, requested, data, len);
+
+ if (!APR_STATUS_IS_EOF(status))
+ return status;
+ }
+
+ status = serf_fcgi_frame_refill(bucket);
+ if (status) {
+ *len = 0;
+ return status;
+ }
+
+ status = serf_bucket_read(ctx->agg, requested, data, len);
+
+ if (APR_STATUS_IS_EOF(status) && !ctx->at_eof)
+ status = APR_SUCCESS;
+
+ return status;
}
static apr_status_t serf_fcgi_frame_read_iovec(serf_bucket_t *bucket,
@@ -625,30 +767,89 @@ static apr_status_t serf_fcgi_frame_read
struct iovec *vecs,
int *vecs_used)
{
- return APR_ENOTIMPL;
+ fcgi_frame_ctx_t *ctx = bucket->data;
+ apr_status_t status;
+
+ if (ctx->agg) {
+ status = serf_bucket_read_iovec(ctx->agg, requested, vecs_size,
+ vecs, vecs_used);
+
+ if (!APR_STATUS_IS_EOF(status))
+ return status;
+ }
+
+ status = serf_fcgi_frame_refill(bucket);
+ if (status) {
+ *vecs_used = 0;
+ return status;
+ }
+
+ status = serf_bucket_read_iovec(ctx->agg, requested, vecs_size,
+ vecs, vecs_used);
+
+ if (APR_STATUS_IS_EOF(status) && !ctx->at_eof)
+ status = APR_SUCCESS;
+
+ return status;
}
static apr_status_t serf_fcgi_frame_peek(serf_bucket_t *bucket,
const char **data,
apr_size_t *len)
{
- return APR_ENOTIMPL;
+ fcgi_frame_ctx_t *ctx = bucket->data;
+ apr_status_t status;
+
+ if (ctx->agg) {
+ status = serf_bucket_peek(ctx->agg, data, len);
+
+ if (!APR_STATUS_IS_EOF(status))
+ return status;
+ }
+
+ status = serf_fcgi_frame_refill(bucket);
+ if (status) {
+ *len = 0;
+ return status;
+ }
+
+ status = serf_bucket_peek(ctx->agg, data, len);
+
+ if (APR_STATUS_IS_EOF(status) && !ctx->at_eof)
+ status = APR_SUCCESS;
+
+ return status;
}
static void serf_fcgi_frame_destroy(serf_bucket_t *bucket)
{
+ fcgi_frame_ctx_t *ctx = bucket->data;
+
+ if (ctx->agg)
+ serf_bucket_destroy(ctx->agg);
+ if (ctx->stream)
+ serf_bucket_destroy(ctx->stream);
+
serf_default_destroy_and_data(bucket);
}
static apr_uint64_t serf_fcgi_frame_get_remaining(serf_bucket_t *bucket)
{
- return APR_ENOTIMPL;
+ return SERF_LENGTH_UNKNOWN;
}
static apr_status_t serf_fcgi_frame_set_config(serf_bucket_t *bucket,
- serf_config_t *config)
+ serf_config_t *config)
{
- return APR_ENOTIMPL;
+ fcgi_frame_ctx_t *ctx = bucket->data;
+
+ if (!ctx->agg)
+ ctx->agg = serf_bucket_aggregate_create(bucket->allocator);
+ serf_bucket_set_config(ctx->agg, config);
+ if (ctx->stream)
+ serf_bucket_set_config(ctx->stream, config);
+
+ return APR_SUCCESS;
}
extern const serf_bucket_type_t serf_bucket_type__fcgi_frame =
Modified: serf/trunk/protocols/fcgi_buckets.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_buckets.h?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_buckets.h (original)
+++ serf/trunk/protocols/fcgi_buckets.h Tue Nov 17 16:07:48 2015
@@ -83,6 +83,8 @@ serf_bucket_t *
serf__bucket_fcgi_frame_create(serf_bucket_t *stream,
apr_uint16_t stream_id,
apr_uint16_t frame_type,
+ bool send_as_stream,
+ bool send_eof,
serf_bucket_alloc_t *alloc);
#ifdef __cplusplus
Modified: serf/trunk/protocols/fcgi_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Tue Nov 17 16:07:48 2015
@@ -347,8 +347,76 @@ static apr_status_t fcgi_read(serf_fcgi_
return APR_SUCCESS;
}
+apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
+ serf_bucket_t *frame,
+ bool pump)
+{
+ apr_status_t status;
+ bool want_write;
+
+ if (!pump && !*fcgi->dirty_pollset)
+ {
+ const char *data;
+ apr_size_t len;
+
+ /* Cheap check to see if we should request a write
+ event next time around */
+ status = serf_bucket_peek(fcgi->ostream, &data, &len);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ {
+ serf_bucket_destroy(frame);
+ return status;
+ }
+
+ if (len == 0)
+ {
+ *fcgi->dirty_pollset = true;
+ fcgi->ctx->dirty_pollset = true;
+ }
+ }
+
+ serf_bucket_aggregate_append(fcgi->ostream, frame);
+
+ if (!pump)
+ return APR_SUCCESS;
+
+ /* Flush final output buffer (after ssl, etc.) */
+ if (fcgi->conn)
+ status = serf__connection_flush(fcgi->conn, TRUE);
+ else
+ status = serf__incoming_client_flush(fcgi->client, TRUE);
+
+ want_write = APR_STATUS_IS_EAGAIN(status);
+
+ if ((want_write && !(*fcgi->req_events & APR_POLLOUT))
+ || (!want_write && (*fcgi->req_events & APR_POLLOUT)))
+ {
+ *fcgi->dirty_pollset = true;
+ fcgi->ctx->dirty_pollset = true;
+ }
+
+ return status;
+}
+
static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi)
{
+ apr_status_t status;
+
+ if (fcgi->client)
+ status = serf__incoming_client_flush(fcgi->client, true);
+ else
+ status = serf__connection_flush(fcgi->conn, true);
+
+ if (APR_STATUS_IS_EAGAIN(status))
+ return APR_SUCCESS;
+ else if (status)
+ return status;
+
+ /* Probably nothing to write. */
+ *fcgi->dirty_pollset = true;
+ fcgi->ctx->dirty_pollset = true;
+
return APR_SUCCESS;
}
Modified: serf/trunk/protocols/fcgi_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Tue Nov 17 16:07:48 2015
@@ -195,6 +195,10 @@ apr_status_t serf_fcgi__setup_incoming_r
void **req_setup_baton,
serf_fcgi_protocol_t *fcgi);
+apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi,
+ serf_bucket_t *frame,
+ bool pump);
+
/* From fcgi_stream.c */
serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
Modified: serf/trunk/protocols/fcgi_stream.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_stream.c (original)
+++ serf/trunk/protocols/fcgi_stream.c Tue Nov 17 16:07:48 2015
@@ -22,6 +22,7 @@
#include <apr_pools.h>
#include <apr_strings.h>
+#include <apr_date.h>
#include "serf.h"
#include "serf_bucket_util.h"
@@ -77,12 +78,59 @@ fcgi_stream_enqueue_response(serf_incomi
void *enqueue_baton,
serf_bucket_t *response_bkt)
{
- return APR_SUCCESS;
+ serf_fcgi_stream_t *stream = enqueue_baton;
+ serf_bucket_alloc_t *alloc = response_bkt->allocator;
+ serf_linebuf_t *linebuf;
+ serf_bucket_t *agg;
+ serf_bucket_t *tmp;
+ apr_status_t status;
+
+ /* With FCGI we don't send the usual first line of the response.
+ We just send a "Status: 200" instead and the actual http
+ server will handle the rest */
+ agg = serf_bucket_aggregate_create(alloc);
+
+ /* Too big for the stack :( */
+ linebuf = serf_bucket_mem_alloc(alloc, sizeof(*linebuf));
+ serf_linebuf_init(linebuf);
+
+ do
+ {
+ status = serf_linebuf_fetch(linebuf, response_bkt, SERF_NEWLINE_ANY);
+ } while (status == APR_SUCCESS && linebuf->state != SERF_LINEBUF_READY);
+
+ if (status
+ || linebuf->state != SERF_LINEBUF_READY
+ || !apr_date_checkmask(linebuf->line, "HTTP/#.# ###*"))
+ {
+ /* We can't write a response in this state yet :( */
+ serf_bucket_mem_free(alloc, linebuf);
+ return status;
+ }
+
+ tmp = SERF_BUCKET_SIMPLE_STRING("Status: ", alloc);
+ serf_bucket_aggregate_append(agg, tmp);
+
+ tmp = serf_bucket_simple_copy_create(linebuf->line + 9, 3, alloc);
+ serf_bucket_aggregate_append(agg, tmp);
+ serf_bucket_mem_free(alloc, linebuf);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING("\r\n", alloc);
+ serf_bucket_aggregate_append(agg, tmp);
+
+ serf_bucket_aggregate_append(agg, response_bkt);
+
+ return serf_fcgi__enqueue_frame(
+ stream->fcgi,
+ serf__bucket_fcgi_frame_create(agg, stream->streamid,
+ FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT),
+ true, true,
+ alloc), true);
}
static apr_status_t
stream_setup_request(serf_fcgi_stream_t *stream,
- serf_config_t *config)
+ serf_config_t *config)
{
serf_bucket_t *agg;
apr_status_t status;
Modified: serf/trunk/test/serf_httpd.c
URL:
http://svn.apache.org/viewvc/serf/trunk/test/serf_httpd.c?rev=1714818&r1=1714817&r2=1714818&view=diff
==============================================================================
--- serf/trunk/test/serf_httpd.c (original)
+++ serf/trunk/test/serf_httpd.c Tue Nov 17 16:07:48 2015
@@ -141,9 +141,6 @@ static apr_status_t request_generate_res
tmp = SERF_BUCKET_SIMPLE_STRING("HTTP/1.1 200 OK" CRLF, alloc);
serf_bucket_aggregate_append(agg, tmp);
- tmp = SERF_BUCKET_SIMPLE_STRING("Transfer-Encoding: chunked" CRLF, alloc);
- serf_bucket_aggregate_append(agg, tmp);
-
tmp = SERF_BUCKET_SIMPLE_STRING("Content-Type: text/plain" CRLF, alloc);
serf_bucket_aggregate_append(agg, tmp);
@@ -173,7 +170,7 @@ static apr_status_t request_generate_res
tmp = SERF_BUCKET_SIMPLE_STRING(CRLF, alloc);
serf_bucket_aggregate_append(body, tmp);
- tmp = serf_bucket_chunk_create(body, alloc);
+ tmp = body;
serf_bucket_aggregate_append(agg, tmp);
tmp = SERF_BUCKET_SIMPLE_STRING(CRLF, alloc);