Author: rhuijben
Date: Tue Nov 17 12:49:11 2015
New Revision: 1714774
URL: http://svn.apache.org/viewvc?rev=1714774&view=rev
Log:
Extend fcgi code with parameter decoding. This completes the server side
request parsing. Next step will be to hook up the incoming request routing
to get us a response to send back.
* buckets/fcgi_buckets.c
(serf_bucket_type__fcgi_params_decode): New bucket type.
* protocols/fcgi_buckets.h
(serf_bucket_type__fcgi_params_decode): New bucket type.
(serf__bucket_fcgi_params_decode_create): New function.
* protocols/fcgi_protocol.c
(SERF_ERROR_FCGI_RECORD_SIZE_ERROR,
SERF_ERROR_FCGI_PROTOCOL_ERROR): New defines. For now mapped to HTTP2
versions.
(fcgi_begin_request): New function.
(fcgi_process): Handle begin request, params and stdin records.
(fcgi_write): Return success instead of failure.
(fcgi_hangup): New function.
(move_to_head): New function.
(serf_fcgi__stream_get): New function.
(fcgi_outgoing_write,
fcgi_outgoing_hangup,
fcgi_outgoing_teardown): Forward to the right function.
(fcgi_server_write,
fcgi_server_hangup,
fcgi_server_teardown): Forward to the right function.
* protocols/fcgi_protocol.h
(*): Copy and paste a few structs from fcgi specs.
(serf_fcgi_stream_t): Store id and role.
(serf_fcgi__stream_get): New function.
(serf_fcgi__stream_create,
serf_fcgi__stream_processor,
serf_fcgi__stream_handle_params,
serf_fcgi__stream_handle_stdin): New function.
* protocols/fcgi_stream.c
(serf_fcgi_stream_data_t): Fill struct.
(serf_fcgi__stream_create): Add proper init.
(stream_agg_eof): New function.
(serf_fcgi__stream_handle_params,
serf_fcgi__stream_handle_stdin): New function.
(serf_fcgi__stream_processor): New function.
Modified:
serf/trunk/buckets/fcgi_buckets.c
serf/trunk/protocols/fcgi_buckets.h
serf/trunk/protocols/fcgi_protocol.c
serf/trunk/protocols/fcgi_protocol.h
serf/trunk/protocols/fcgi_stream.c
Modified: serf/trunk/buckets/fcgi_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/buckets/fcgi_buckets.c?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/buckets/fcgi_buckets.c (original)
+++ serf/trunk/buckets/fcgi_buckets.c Tue Nov 17 12:49:11 2015
@@ -259,8 +259,346 @@ extern const serf_bucket_type_t serf_buc
serf_fcgi_unframe_get_remaining,
serf_fcgi_unframe_set_config
};
+/* ==================================================================== */
+
+typedef struct fcgi_params_decode_ctx_t
+{
+ serf_bucket_t *stream;
+
+ const char *last_data;
+ apr_size_t last_len;
+
+ apr_size_t key_sz;
+ apr_size_t val_sz;
+
+ enum fcgi_param_decode_status_t
+ {
+ DS_SIZES = 0,
+ DS_KEY,
+ DS_VALUE,
+ } state;
+
+ char size_buffer[8];
+ apr_size_t tmp_size;
+
+ char *key;
+ char *val;
+
+ const char *method;
+ const char *path;
+
+ serf_bucket_t *headers;
+
+} fcgi_params_decode_ctx_t;
+
+serf_bucket_t *
+serf__bucket_fcgi_params_decode_create(serf_bucket_t *stream,
+ serf_bucket_alloc_t *alloc)
+{
+ fcgi_params_decode_ctx_t *ctx;
+
+ ctx = serf_bucket_mem_calloc(alloc, sizeof(*ctx));
+ ctx->stream = stream;
+
+ return serf_bucket_create(&serf_bucket_type__fcgi_params_decode, alloc,
+ ctx);
+}
+
+static apr_size_t size_data_requested(fcgi_params_decode_ctx_t *ctx)
+{
+ apr_size_t requested;
+
+ if (ctx->tmp_size < 1)
+ requested = 2;
+ else if (ctx->size_buffer[0] & 0x80) {
+ requested = 5;
+
+ if (ctx->tmp_size > 4
+ && ctx->size_buffer[4] & 0x80) {
+ requested = 8;
+ }
+ }
+ else if (ctx->tmp_size >= 2
+ && ctx->size_buffer[1] & 0x80) {
+ requested = 5;
+ }
+ else
+ requested = 2;
+
+ return requested;
+}
+
+void fcgi_handle_keypair(serf_bucket_t *bucket)
+{
+ fcgi_params_decode_ctx_t *ctx = bucket->data;
+ char *key = ctx->key;
+ char *val = ctx->val;
+
+ ctx->key = NULL;
+ ctx->val = NULL;
+
+ if (!ctx->headers)
+ ctx->headers = serf_bucket_headers_create(bucket->allocator);
+
+ if (strncasecmp(key, "HTTP_", 5) == 0
+ && strncasecmp(key + 5, "_FCGI_", 6) != 0)
+ {
+ apr_size_t i;
+ memmove(key, key + 5, ctx->key_sz - 5 + 1);
+ for (i = 0; i < ctx->key_sz; i++) {
+ if (key[i] == '_')
+ key[i] = '-';
+ }
+ ctx->key_sz -= 5;
+ }
+ else if (ctx->key_sz == 6 && !strcasecmp(key, "METHOD"))
+ {
+ ctx->method = val;
+ serf_bucket_mem_free(bucket->allocator, key);
+ return;
+ }
+ else if (ctx->key_sz == 11 && !strcasecmp(key, "REQUEST_URI"))
+ {
+ ctx->path = val;
+ serf_bucket_mem_free(bucket->allocator, key);
+ return;
+ }
+ else
+ {
+ memmove(key + 6, key, ctx->key_sz + 1);
+ memcpy(key, "_FCGI_", 6);
+ ctx->key_sz += 6;
+ }
+
+ serf_bucket_headers_setx(ctx->headers,
+ key, ctx->key_sz, TRUE,
+ val, ctx->val_sz, TRUE);
+ serf_bucket_mem_free(bucket->allocator, key);
+ serf_bucket_mem_free(bucket->allocator, val);
+}
+
+apr_status_t fcgi_params_decode(serf_bucket_t *bucket)
+{
+ fcgi_params_decode_ctx_t *ctx = bucket->data;
+ apr_status_t status = APR_SUCCESS;
+
+ while (status == APR_SUCCESS) {
+ apr_size_t requested;
+ const char *data;
+ const unsigned char *udata;
+ apr_size_t len;
+
+ switch (ctx->state) {
+ case DS_SIZES:
+ requested = size_data_requested(ctx);
+ status = serf_bucket_read(ctx->stream,
+ requested - ctx->tmp_size,
+ &data, &len);
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ if (len < requested) {
+ memcpy(ctx->size_buffer + ctx->tmp_size, data, len);
+ ctx->tmp_size += len;
+
+ len = ctx->tmp_size;
+ data = ctx->size_buffer;
+ }
+
+ if (size_data_requested(ctx) < len) {
+ /* Read again. More bytes needed for
+ determining lengths */
+ if (data != ctx->size_buffer) {
+ memcpy(ctx->size_buffer, data, len);
+ ctx->tmp_size = len;
+ }
+ break;
+ }
+
+ udata = (const unsigned char*)data;
+
+ if (udata[0] & 0x80) {
+ ctx->key_sz = (udata[0] & 0x7F) << 24 | (udata[1] << 16)
+ | (udata[2] << 8) | (udata[3]);
+ udata += 4;
+ }
+ else {
+ ctx->key_sz = udata[0] & 0x7F;
+ udata += 1;
+ }
+
+ if (udata[0] & 0x80) {
+ ctx->val_sz = (udata[0] & 0x7F) << 24 | (udata[1] << 16)
+ | (udata[2] << 8) | (udata[3]);
+ udata += 4;
+ }
+ else {
+ ctx->val_sz = udata[0] & 0x7F;
+ udata += 1;
+ }
+
+ ctx->tmp_size = 0;
+ ctx->state++;
+ break;
+ case DS_KEY:
+ status = serf_bucket_read(ctx->stream, ctx->key_sz,
+ &data, &len);
+ if (SERF_BUCKET_READ_ERROR(status))
+ break;
+
+ if (!ctx->key) {
+ ctx->key = serf_bucket_mem_alloc(bucket->allocator,
+ ctx->key_sz + 1 + 6);
+ ctx->key[ctx->key_sz] = 0;
+ }
+
+ memcpy(ctx->key + ctx->tmp_size, data, len);
+ ctx->tmp_size += len;
+
+ if (ctx->tmp_size == ctx->key_sz) {
+ ctx->state++;
+ ctx->tmp_size = 0;
+ }
+ break;
+ case DS_VALUE:
+ status = serf_bucket_read(ctx->stream, ctx->val_sz,
+ &data, &len);
+ if (SERF_BUCKET_READ_ERROR(status))
+ break;
+ if (!ctx->val) {
+ ctx->val = serf_bucket_mem_alloc(bucket->allocator,
+ ctx->val_sz + 1);
+ ctx->val[ctx->val_sz] = 0;
+ }
+
+ if (len == ctx->val_sz)
+ ctx->state++;
+
+ memcpy(ctx->val + ctx->tmp_size, data, len);
+ ctx->tmp_size += len;
+
+ if (ctx->tmp_size == ctx->val_sz) {
+
+ fcgi_handle_keypair(bucket);
+ ctx->state = DS_SIZES;
+ ctx->tmp_size = 0;
+ }
+ break;
+ }
+ }
+
+ if (APR_STATUS_IS_EOF(status)) {
+ if (ctx->state == DS_SIZES && !ctx->tmp_size
+ || (ctx->state == DS_KEY && !ctx->key_sz && !ctx->val_sz))
+ {
+ return APR_SUCCESS;
+ }
+
+ return SERF_ERROR_TRUNCATED_STREAM;
+ }
+
+ return status;
+}
+
+static void fcgi_serialize(serf_bucket_t *bucket)
+{
+ fcgi_params_decode_ctx_t *ctx = bucket->data;
+ serf_bucket_t *tmp;
+
+ serf_bucket_aggregate_become(bucket);
+
+ if (ctx->method || ctx->path) {
+ if (ctx->method) {
+ tmp = serf_bucket_simple_own_create(ctx->method,
strlen(ctx->method),
+ bucket->allocator);
+ }
+ else
+ tmp = SERF_BUCKET_SIMPLE_STRING("GET", bucket->allocator);
+ serf_bucket_aggregate_append(bucket, tmp);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING(" ", bucket->allocator);
+ serf_bucket_aggregate_append(bucket, tmp);
+
+ if (ctx->path) {
+ tmp = serf_bucket_simple_own_create(ctx->path, strlen(ctx->path),
+ bucket->allocator);
+ }
+ else
+ tmp = SERF_BUCKET_SIMPLE_STRING("/", bucket->allocator);
+ serf_bucket_aggregate_append(bucket, tmp);
+
+ tmp = SERF_BUCKET_SIMPLE_STRING(" HTTP/2.0\r\n", bucket->allocator);
+ serf_bucket_aggregate_append(bucket, tmp);
+ }
+
+ if (ctx->headers)
+ serf_bucket_aggregate_append(bucket, ctx->headers);
+
+ if (ctx->key)
+ serf_bucket_mem_free(bucket->allocator, ctx->key);
+ if (ctx->val)
+ serf_bucket_mem_free(bucket->allocator, ctx->val);
+
+ serf_bucket_mem_free(bucket->allocator, ctx);
+}
+
+static apr_status_t fcgi_params_decode_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data,
+ apr_size_t *len)
+{
+ apr_status_t status;
+
+ status = fcgi_params_decode(bucket);
+
+ if (status) {
+ *len = 0;
+ return status;
+ }
+
+ fcgi_serialize(bucket);
+ return bucket->type->read(bucket, requested, data, len);
+}
+
+static apr_status_t fcgi_params_decode_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ apr_status_t status;
+ status = fcgi_params_decode(bucket);
+ if (status) {
+ *len = 0;
+ return status;
+ }
+
+ fcgi_serialize(bucket);
+ return bucket->type->peek(bucket, data, len);
+}
+
+static void fcgi_params_decode_destroy(serf_bucket_t *bucket)
+{
+ fcgi_serialize(bucket);
+
+ bucket->type->destroy(bucket);
+}
+
+
+extern const serf_bucket_type_t serf_bucket_type__fcgi_params_decode =
+{
+ "FCGI-PARAMS_DECODE",
+ fcgi_params_decode_read,
+ serf_default_readline,
+ serf_default_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_buckets_are_v2,
+ fcgi_params_decode_peek,
+ fcgi_params_decode_destroy,
+ serf_default_read_bucket,
+ serf_default_get_remaining,
+ serf_default_ignore_config
+};
/* ==================================================================== */
serf_bucket_t *
@@ -327,3 +665,4 @@ extern const serf_bucket_type_t serf_buc
serf_fcgi_frame_get_remaining,
serf_fcgi_frame_set_config
};
+
Modified: serf/trunk/protocols/fcgi_buckets.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_buckets.h?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_buckets.h (original)
+++ serf/trunk/protocols/fcgi_buckets.h Tue Nov 17 12:49:11 2015
@@ -66,6 +66,15 @@ apr_status_t serf__bucket_fcgi_unframe_r
apr_uint16_t *frame_type);
/* ==================================================================== */
+extern const serf_bucket_type_t serf_bucket_type__fcgi_params_decode;
+#define SERF__BUCKET_IS_FCGI_PARAMS_DECODE(b) \
+ SERF_BUCKET_CHECK((b), _fcgi_params_decode)
+
+serf_bucket_t *
+serf__bucket_fcgi_params_decode_create(serf_bucket_t *stream,
+ serf_bucket_alloc_t *alloc);
+
+/* ==================================================================== */
extern const serf_bucket_type_t serf_bucket_type__fcgi_frame;
#define SERF__BUCKET_IS_FCGI_FRAME(b) SERF_BUCKET_CHECK((b), _fcgi_frame)
Modified: serf/trunk/protocols/fcgi_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Tue Nov 17 12:49:11 2015
@@ -31,6 +31,9 @@
#include "protocols/fcgi_buckets.h"
#include "protocols/fcgi_protocol.h"
+#define SERF_ERROR_FCGI_RECORD_SIZE_ERROR SERF_ERROR_HTTP2_FRAME_SIZE_ERROR
+#define SERF_ERROR_FCGI_PROTOCOL_ERROR SERF_ERROR_HTTP2_PROTOCOL_ERROR
+
typedef struct serf_fcgi_protocol_t
{
serf_context_t *ctx;
@@ -53,6 +56,10 @@ typedef struct serf_fcgi_protocol_t
serf_bucket_t *read_frame;
bool in_frame;
+ serf_fcgi_stream_t *first, *last;
+
+ bool no_keep_conn;
+
} serf_fcgi_protocol_t;
static apr_status_t fcgi_cleanup(void *baton)
@@ -64,6 +71,31 @@ static apr_status_t fcgi_cleanup(void *b
return APR_SUCCESS;
}
+/* Implements serf_bucket_prefix_handler_t.
+ Handles PING frames for pings initiated locally */
+static apr_status_t fcgi_begin_request(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_fcgi_stream_t *stream = baton;
+ const FCGI_BeginRequestBody *brb;
+
+ if (len != sizeof(*brb))
+ return SERF_ERROR_FCGI_RECORD_SIZE_ERROR;
+
+ brb = (const void*)data;
+
+ stream->role = (brb->roleB1 << 8) | (brb->roleB0);
+
+ if (!(brb->flags & FCGI_KEEP_CONN))
+ stream->fcgi->no_keep_conn = true;
+
+
+ return APR_SUCCESS;
+}
+
+
/* Implements the serf_bucket_end_of_frame_t callback */
static apr_status_t
fcgi_end_of_frame(void *baton,
@@ -164,7 +196,6 @@ static apr_status_t fcgi_process(serf_fc
void *process_baton = NULL;
serf_bucket_t *process_bucket = NULL;
serf_fcgi_stream_t *stream;
- apr_uint32_t reset_reason;
status = serf__bucket_fcgi_unframe_read_info(body, &sid,
&frametype);
@@ -195,6 +226,25 @@ static apr_status_t fcgi_process(serf_fc
switch (frametype)
{
case FCGI_FRAMETYPE(FCGI_V1, FCGI_BEGIN_REQUEST):
+ stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+
+ if (stream) {
+ /* Stream must be new */
+ return SERF_ERROR_FCGI_PROTOCOL_ERROR;
+ }
+ stream = serf_fcgi__stream_get(fcgi, sid, TRUE, TRUE);
+
+ remaining = (apr_size_t)serf_bucket_get_remaining(body);
+ if (remaining != sizeof(FCGI_BeginRequestBody)) {
+ return SERF_ERROR_FCGI_RECORD_SIZE_ERROR;
+ }
+ body = serf_bucket_prefix_create(
+ body,
+ sizeof(FCGI_BeginRequestBody),
+ fcgi_begin_request, stream,
+ fcgi->allocator);
+
+ /* Just reading will handle this frame now*/
process_bucket = body;
break;
case FCGI_FRAMETYPE(FCGI_V1, FCGI_ABORT_REQUEST):
@@ -204,10 +254,44 @@ static apr_status_t fcgi_process(serf_fc
process_bucket = body;
break;
case FCGI_FRAMETYPE(FCGI_V1, FCGI_PARAMS):
- process_bucket = body;
+ stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+ if (!stream) {
+ return SERF_ERROR_FCGI_PROTOCOL_ERROR;
+ }
+
+ body = serf_fcgi__stream_handle_params(stream, body,
+ fcgi->allocator);
+
+ if (body) {
+ /* We will take care of discarding */
+ process_bucket = body;
+ }
+ else
+ {
+ /* The stream wants to handle the reading itself */
+ process_handler = serf_fcgi__stream_processor;
+ process_baton = stream;
+ }
break;
case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDIN):
- process_bucket = body;
+ stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE);
+ if (!stream) {
+ return SERF_ERROR_FCGI_PROTOCOL_ERROR;
+ }
+
+ body = serf_fcgi__stream_handle_stdin(stream, body,
+ fcgi->allocator);
+
+ if (body) {
+ /* We will take care of discarding */
+ process_bucket = body;
+ }
+ else
+ {
+ /* The stream wants to handle the reading itself */
+ process_handler = serf_fcgi__stream_processor;
+ process_baton = stream;
+ }
break;
case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT):
process_bucket = body;
@@ -263,6 +347,11 @@ static apr_status_t fcgi_read(serf_fcgi_
static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi)
{
+ return APR_SUCCESS;
+}
+
+static apr_status_t fcgi_hangup(serf_fcgi_protocol_t *fcgi)
+{
return APR_ENOTIMPL;
}
@@ -271,6 +360,52 @@ static apr_status_t fcgi_teardown(serf_f
return APR_ENOTIMPL;
}
+static void
+move_to_head(serf_fcgi_stream_t *stream)
+{
+ /* Not implemented yet */
+}
+
+serf_fcgi_stream_t *
+serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
+ apr_uint16_t streamid,
+ bool create,
+ bool move_first)
+{
+ serf_fcgi_stream_t *stream;
+
+ if (streamid == 0)
+ return NULL;
+
+ for (stream = fcgi->first; stream; stream = stream->next)
+ {
+ if (stream->streamid == streamid)
+ {
+ if (move_first && stream != fcgi->first)
+ move_to_head(stream);
+
+ return stream;
+ }
+ }
+
+ if (create)
+ {
+ stream = serf_fcgi__stream_create(fcgi, streamid, fcgi->allocator);
+
+ if (fcgi->first)
+ {
+ stream->next = fcgi->first;
+ fcgi->first->prev = stream;
+ fcgi->first = stream;
+ }
+ else
+ fcgi->last = fcgi->first = stream;
+
+ return stream;
+ }
+ return NULL;
+}
+
/* --------------- connection support --------------- */
static apr_status_t fcgi_outgoing_read(serf_connection_t *conn)
@@ -287,21 +422,21 @@ static apr_status_t fcgi_outgoing_write(
{
serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
- return fcgi_read(fcgi);
+ return fcgi_teardown(fcgi);
}
static apr_status_t fcgi_outgoing_hangup(serf_connection_t *conn)
{
serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
- return fcgi_read(fcgi);
+ return fcgi_teardown(fcgi);
}
static apr_status_t fcgi_outgoing_teardown(serf_connection_t *conn)
{
serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
- return fcgi_read(fcgi);
+ return fcgi_teardown(fcgi);
}
void serf__fcgi_protocol_init(serf_connection_t *conn)
@@ -353,21 +488,26 @@ static apr_status_t fcgi_server_write(se
{
serf_fcgi_protocol_t *fcgi = client->protocol_baton;
- return fcgi_read(fcgi);
+ if (!fcgi->stream) {
+ fcgi->stream = client->stream;
+ fcgi->ostream = client->ostream_tail;
+ }
+
+ return fcgi_write(fcgi);
}
static apr_status_t fcgi_server_hangup(serf_incoming_t *client)
{
serf_fcgi_protocol_t *fcgi = client->protocol_baton;
- return fcgi_read(fcgi);
+ return fcgi_hangup(fcgi);
}
static apr_status_t fcgi_server_teardown(serf_incoming_t *client)
{
serf_fcgi_protocol_t *fcgi = client->protocol_baton;
- return fcgi_read(fcgi);
+ return fcgi_teardown(fcgi);
}
void serf__fcgi_protocol_init_server(serf_incoming_t *client)
Modified: serf/trunk/protocols/fcgi_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.h (original)
+++ serf/trunk/protocols/fcgi_protocol.h Tue Nov 17 12:49:11 2015
@@ -46,6 +46,37 @@ typedef struct serf_fcgi_stream_data_t s
#define FCGI_V1 0x1
+/* From protocol specs */
+/*
+* Listening socket file number
+*/
+#define FCGI_LISTENSOCK_FILENO 0
+
+typedef struct FCGI_Header {
+ unsigned char version;
+ unsigned char type;
+ unsigned char requestIdB1;
+ unsigned char requestIdB0;
+ unsigned char contentLengthB1;
+ unsigned char contentLengthB0;
+ unsigned char paddingLength;
+ unsigned char reserved;
+} FCGI_Header;
+
+/*
+ * Number of bytes in a FCGI_Header. Future versions of the protocol
+ * will not reduce this number.
+ */
+#define FCGI_HEADER_LEN 8
+
+/*
+ * Value for version component of FCGI_Header
+ */
+#define FCGI_VERSION_1 1
+
+/*
+ * Values for type component of FCGI_Header
+ */
#define FCGI_BEGIN_REQUEST 1
#define FCGI_ABORT_REQUEST 2
#define FCGI_END_REQUEST 3
@@ -59,12 +90,85 @@ typedef struct serf_fcgi_stream_data_t s
#define FCGI_UNKNOWN_TYPE 11
#define FCGI_MAXTYPE (FCGI_UNKNOWN_TYPE)
+/*
+* Value for requestId component of FCGI_Header
+*/
+#define FCGI_NULL_REQUEST_ID 0
+
+typedef struct FCGI_BeginRequestBody {
+ unsigned char roleB1;
+ unsigned char roleB0;
+ unsigned char flags;
+ unsigned char reserved[5];
+} FCGI_BeginRequestBody;
+
+typedef struct FCGI_BeginRequestRecord {
+ FCGI_Header header;
+ FCGI_BeginRequestBody body;
+} FCGI_BeginRequestRecord;
+
+/*
+ * Mask for flags component of FCGI_BeginRequestBody
+ */
+#define FCGI_KEEP_CONN 1
+
+/*
+ * Values for role component of FCGI_BeginRequestBody
+ */
+#define FCGI_RESPONDER 1
+#define FCGI_AUTHORIZER 2
+#define FCGI_FILTER 3
+
+typedef struct FCGI_EndRequestBody {
+ unsigned char appStatusB3;
+ unsigned char appStatusB2;
+ unsigned char appStatusB1;
+ unsigned char appStatusB0;
+ unsigned char protocolStatus;
+ unsigned char reserved[3];
+} FCGI_EndRequestBody;
+
+typedef struct FCGI_EndRequestRecord {
+ FCGI_Header header;
+ FCGI_EndRequestBody body;
+} FCGI_EndRequestRecord;
+
+/*
+* Values for protocolStatus component of FCGI_EndRequestBody
+*/
+#define FCGI_REQUEST_COMPLETE 0
+#define FCGI_CANT_MPX_CONN 1
+#define FCGI_OVERLOADED 2
+#define FCGI_UNKNOWN_ROLE 3
+
+/*
+* Variable names for FCGI_GET_VALUES / FCGI_GET_VALUES_RESULT records
+*/
+#define FCGI_MAX_CONNS "FCGI_MAX_CONNS"
+#define FCGI_MAX_REQS "FCGI_MAX_REQS"
+#define FCGI_MPXS_CONNS "FCGI_MPXS_CONNS"
+
+typedef struct FCGI_UnknownTypeBody {
+ unsigned char type;
+ unsigned char reserved[7];
+} FCGI_UnknownTypeBody;
+
+typedef struct FCGI_UnknownTypeRecord {
+ FCGI_Header header;
+ FCGI_UnknownTypeBody body;
+} FCGI_UnknownTypeRecord;
+
+
+/**************************************************/
typedef struct serf_fcgi_stream_t
{
- struct serf_fcgi_protocol_t *h2;
+ struct serf_fcgi_protocol_t *fcgi;
serf_bucket_alloc_t *alloc;
+ apr_uint16_t streamid;
+ apr_uint16_t role;
+
/* Opaque implementation details */
serf_fcgi_stream_data_t *data;
@@ -78,6 +182,31 @@ typedef apr_status_t(*serf_fcgi_processo
serf_bucket_t *body);
+/* From fcgi_protocol.c */
+serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi,
+ apr_uint16_t streamid,
+ bool create,
+ bool move_first);
+
+
+/* From fcgi_stream.c */
+serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
+ apr_uint16_t streamid,
+ serf_bucket_alloc_t *alloc);
+
+apr_status_t serf_fcgi__stream_processor(void *baton,
+ serf_fcgi_protocol_t *fcgi,
+ serf_bucket_t *body);
+
+serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *alloc);
+
+serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *alloc);
+
+
#ifdef __cplusplus
}
Modified: serf/trunk/protocols/fcgi_stream.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714774&r1=1714773&r2=1714774&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_stream.c (original)
+++ serf/trunk/protocols/fcgi_stream.c Tue Nov 17 12:49:11 2015
@@ -32,26 +32,167 @@
struct serf_fcgi_stream_data_t
{
- int dummy;
+ serf_bucket_t *req_agg;
+ bool headers_eof;
+ bool stdin_eof;
+
+ serf_request_t *request;
+ serf_incoming_request_t *in_request;
};
-serf_fcgi_stream_t *
-serf_fcgi__stream_create(serf_fcgi_protocol_t *h2,
- apr_int32_t streamid,
- apr_uint32_t lr_window,
- apr_uint32_t rl_window,
- serf_bucket_alloc_t *alloc)
+serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi,
+ apr_uint16_t streamid,
+ serf_bucket_alloc_t *alloc)
{
serf_fcgi_stream_t *stream = serf_bucket_mem_alloc(alloc,
sizeof(*stream));
- stream->h2 = h2;
+ stream->fcgi = fcgi;
stream->alloc = alloc;
+ stream->streamid = streamid;
stream->next = stream->prev = NULL;
/* Delay creating this? */
- stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data));
+ stream->data = serf_bucket_mem_calloc(alloc, sizeof(*stream->data));
return stream;
}
+
+/* Aggregate hold open callback for what requests will think is the
+ actual body */
+static apr_status_t stream_agg_eof(void *baton,
+ serf_bucket_t *bucket)
+{
+ serf_fcgi_stream_t *stream = baton;
+
+ if (!stream->data->stdin_eof)
+ return APR_EAGAIN;
+
+ return APR_EOF;
+}
+
+serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *alloc)
+{
+ apr_size_t remaining;
+ if (!stream->data->req_agg) {
+ stream->data->req_agg = serf_bucket_aggregate_create(stream->alloc);
+
+ serf_bucket_aggregate_hold_open(stream->data->req_agg,
+ stream_agg_eof, stream);
+ }
+
+ remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+ if (remaining) {
+ body = serf__bucket_fcgi_params_decode_create(body, body->allocator);
+ }
+ else {
+ stream->data->headers_eof = true;
+ }
+
+ /* And add it to our aggregate in both cases */
+ serf_bucket_aggregate_append(stream->data->req_agg, body);
+
+ return NULL;
+}
+
+serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream,
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *alloc)
+{
+ apr_size_t remaining;
+ SERF_FCGI_assert(stream->data->headers_eof);
+ if (!stream->data->req_agg) {
+ stream->data->req_agg = serf_bucket_aggregate_create(stream->alloc);
+
+ serf_bucket_aggregate_hold_open(stream->data->req_agg,
+ stream_agg_eof, stream);
+ }
+
+ remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+ if (!remaining) {
+ stream->data->stdin_eof = true;
+ }
+
+ /* And add it to our aggregate in both cases */
+ serf_bucket_aggregate_append(stream->data->req_agg, body);
+
+ return NULL;
+}
+
+
+apr_status_t serf_fcgi__stream_processor(void *baton,
+ serf_fcgi_protocol_t *fcgi,
+ serf_bucket_t *body)
+{
+ serf_fcgi_stream_t *stream = baton;
+ apr_status_t status = APR_SUCCESS;
+
+ SERF_FCGI_assert(stream->data->req_agg != NULL);
+
+ if (stream->data->request) {
+ /* ### TODO */
+ }
+ else if (stream->data->in_request) {
+ serf_incoming_request_t *request = stream->data->in_request;
+
+ SERF_FCGI_assert(request->req_bkt != NULL);
+
+ status = request->handler(request, request->req_bkt,
+ request->handler_baton,
+ request->pool);
+
+ if (!APR_STATUS_IS_EOF(status)
+ && !SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ if (APR_STATUS_IS_EOF(status)) {
+ status = serf_incoming_response_create(request);
+
+ if (status)
+ return status;
+ }
+
+ if (SERF_BUCKET_READ_ERROR(status)) {
+
+ /* SEND ERROR status */
+
+ return status;
+ }
+ }
+
+ while (!status)
+ {
+ struct iovec vecs[IOV_MAX];
+ int vecs_used;
+
+ /* Drain the bucket as efficiently as possible */
+ status = serf_bucket_read_iovec(stream->data->req_agg,
+ SERF_READ_ALL_AVAIL,
+ IOV_MAX, vecs, &vecs_used);
+
+ if (vecs_used) {
+ /* We have data... What should we do with it? */
+ /*int i;
+
+ for (i = 0; i < vecs_used; i++)
+ fprintf(stderr, "%.*s", vecs[i].iov_len, vecs[i].iov_base);*/
+ }
+ }
+
+ if (APR_STATUS_IS_EOF(status))
+ {
+ /* If there was a request, it is already gone, so we can now safely
+ destroy our aggregate which may include everything upto the http2
+ frames */
+ serf_bucket_destroy(stream->data->req_agg);
+ stream->data->req_agg = NULL;
+ }
+
+ return status;
+}
+