Author: rhuijben Date: Tue Nov 17 12:49:11 2015 New Revision: 1714774 URL: http://svn.apache.org/viewvc?rev=1714774&view=rev Log: Extend fcgi code with parameter decoding. This completes the server side request parsing. Next step will be to hook up the incoming request routing to get us a response to send back.
* buckets/fcgi_buckets.c (serf_bucket_type__fcgi_params_decode): New bucket type. * protocols/fcgi_buckets.h (serf_bucket_type__fcgi_params_decode): New bucket type. (serf__bucket_fcgi_params_decode_create): New function. * protocols/fcgi_protocol.c (SERF_ERROR_FCGI_RECORD_SIZE_ERROR, SERF_ERROR_FCGI_PROTOCOL_ERROR): New defines. For now mapped to HTTP2 versions. (fcgi_begin_request): New function. (fcgi_process): Handle begin request, params and stdin records. (fcgi_write): Return success instead of failure. (fcgi_hangup): New function. (move_to_head): New function. (serf_fcgi__stream_get): New function. (fcgi_outgoing_write, fcgi_outgoing_hangup, fcgi_outgoing_teardown): Forward to the right function. (fcgi_server_write, fcgi_server_hangup, fcgi_server_teardown): Forward to the right function. * protocols/fcgi_protocol.h (*): Copy and paste a few structs from fcgi specs. (serf_fcgi_stream_t): Store id and role. (serf_fcgi__stream_get): New function. (serf_fcgi__stream_create, serf_fcgi__stream_processor, serf_fcgi__stream_handle_params, serf_fcgi__stream_handle_stdin): New function. * protocols/fcgi_stream.c (serf_fcgi_stream_data_t): Fill struct. (serf_fcgi__stream_create): Add proper init. (stream_agg_eof): New function. (serf_fcgi__stream_handle_params, serf_fcgi__stream_handle_stdin): New function. (serf_fcgi__stream_processor): New function. Modified: serf/trunk/buckets/fcgi_buckets.c serf/trunk/protocols/fcgi_buckets.h serf/trunk/protocols/fcgi_protocol.c serf/trunk/protocols/fcgi_protocol.h serf/trunk/protocols/fcgi_stream.c Modified: serf/trunk/buckets/fcgi_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/fcgi_buckets.c?rev=1714774&r1=1714773&r2=1714774&view=diff ============================================================================== --- serf/trunk/buckets/fcgi_buckets.c (original) +++ serf/trunk/buckets/fcgi_buckets.c Tue Nov 17 12:49:11 2015 @@ -259,8 +259,346 @@ extern const serf_bucket_type_t serf_buc serf_fcgi_unframe_get_remaining, serf_fcgi_unframe_set_config }; +/* ==================================================================== */ + +typedef struct fcgi_params_decode_ctx_t +{ + serf_bucket_t *stream; + + const char *last_data; + apr_size_t last_len; + + apr_size_t key_sz; + apr_size_t val_sz; + + enum fcgi_param_decode_status_t + { + DS_SIZES = 0, + DS_KEY, + DS_VALUE, + } state; + + char size_buffer[8]; + apr_size_t tmp_size; + + char *key; + char *val; + + const char *method; + const char *path; + + serf_bucket_t *headers; + +} fcgi_params_decode_ctx_t; + +serf_bucket_t * +serf__bucket_fcgi_params_decode_create(serf_bucket_t *stream, + serf_bucket_alloc_t *alloc) +{ + fcgi_params_decode_ctx_t *ctx; + + ctx = serf_bucket_mem_calloc(alloc, sizeof(*ctx)); + ctx->stream = stream; + + return serf_bucket_create(&serf_bucket_type__fcgi_params_decode, alloc, + ctx); +} + +static apr_size_t size_data_requested(fcgi_params_decode_ctx_t *ctx) +{ + apr_size_t requested; + + if (ctx->tmp_size < 1) + requested = 2; + else if (ctx->size_buffer[0] & 0x80) { + requested = 5; + + if (ctx->tmp_size > 4 + && ctx->size_buffer[4] & 0x80) { + requested = 8; + } + } + else if (ctx->tmp_size >= 2 + && ctx->size_buffer[1] & 0x80) { + requested = 5; + } + else + requested = 2; + + return requested; +} + +void fcgi_handle_keypair(serf_bucket_t *bucket) +{ + fcgi_params_decode_ctx_t *ctx = bucket->data; + char *key = ctx->key; + char *val = ctx->val; + + ctx->key = NULL; + ctx->val = NULL; + + if (!ctx->headers) + ctx->headers = serf_bucket_headers_create(bucket->allocator); + + if (strncasecmp(key, "HTTP_", 5) == 0 + && strncasecmp(key + 5, "_FCGI_", 6) != 0) + { + apr_size_t i; + memmove(key, key + 5, ctx->key_sz - 5 + 1); + for (i = 0; i < ctx->key_sz; i++) { + if (key[i] == '_') + key[i] = '-'; + } + ctx->key_sz -= 5; + } + else if (ctx->key_sz == 6 && !strcasecmp(key, "METHOD")) + { + ctx->method = val; + serf_bucket_mem_free(bucket->allocator, key); + return; + } + else if (ctx->key_sz == 11 && !strcasecmp(key, "REQUEST_URI")) + { + ctx->path = val; + serf_bucket_mem_free(bucket->allocator, key); + return; + } + else + { + memmove(key + 6, key, ctx->key_sz + 1); + memcpy(key, "_FCGI_", 6); + ctx->key_sz += 6; + } + + serf_bucket_headers_setx(ctx->headers, + key, ctx->key_sz, TRUE, + val, ctx->val_sz, TRUE); + serf_bucket_mem_free(bucket->allocator, key); + serf_bucket_mem_free(bucket->allocator, val); +} + +apr_status_t fcgi_params_decode(serf_bucket_t *bucket) +{ + fcgi_params_decode_ctx_t *ctx = bucket->data; + apr_status_t status = APR_SUCCESS; + + while (status == APR_SUCCESS) { + apr_size_t requested; + const char *data; + const unsigned char *udata; + apr_size_t len; + + switch (ctx->state) { + case DS_SIZES: + requested = size_data_requested(ctx); + status = serf_bucket_read(ctx->stream, + requested - ctx->tmp_size, + &data, &len); + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + if (len < requested) { + memcpy(ctx->size_buffer + ctx->tmp_size, data, len); + ctx->tmp_size += len; + + len = ctx->tmp_size; + data = ctx->size_buffer; + } + + if (size_data_requested(ctx) < len) { + /* Read again. More bytes needed for + determining lengths */ + if (data != ctx->size_buffer) { + memcpy(ctx->size_buffer, data, len); + ctx->tmp_size = len; + } + break; + } + + udata = (const unsigned char*)data; + + if (udata[0] & 0x80) { + ctx->key_sz = (udata[0] & 0x7F) << 24 | (udata[1] << 16) + | (udata[2] << 8) | (udata[3]); + udata += 4; + } + else { + ctx->key_sz = udata[0] & 0x7F; + udata += 1; + } + + if (udata[0] & 0x80) { + ctx->val_sz = (udata[0] & 0x7F) << 24 | (udata[1] << 16) + | (udata[2] << 8) | (udata[3]); + udata += 4; + } + else { + ctx->val_sz = udata[0] & 0x7F; + udata += 1; + } + + ctx->tmp_size = 0; + ctx->state++; + break; + case DS_KEY: + status = serf_bucket_read(ctx->stream, ctx->key_sz, + &data, &len); + if (SERF_BUCKET_READ_ERROR(status)) + break; + + if (!ctx->key) { + ctx->key = serf_bucket_mem_alloc(bucket->allocator, + ctx->key_sz + 1 + 6); + ctx->key[ctx->key_sz] = 0; + } + + memcpy(ctx->key + ctx->tmp_size, data, len); + ctx->tmp_size += len; + + if (ctx->tmp_size == ctx->key_sz) { + ctx->state++; + ctx->tmp_size = 0; + } + break; + case DS_VALUE: + status = serf_bucket_read(ctx->stream, ctx->val_sz, + &data, &len); + if (SERF_BUCKET_READ_ERROR(status)) + break; + if (!ctx->val) { + ctx->val = serf_bucket_mem_alloc(bucket->allocator, + ctx->val_sz + 1); + ctx->val[ctx->val_sz] = 0; + } + + if (len == ctx->val_sz) + ctx->state++; + + memcpy(ctx->val + ctx->tmp_size, data, len); + ctx->tmp_size += len; + + if (ctx->tmp_size == ctx->val_sz) { + + fcgi_handle_keypair(bucket); + ctx->state = DS_SIZES; + ctx->tmp_size = 0; + } + break; + } + } + + if (APR_STATUS_IS_EOF(status)) { + if (ctx->state == DS_SIZES && !ctx->tmp_size + || (ctx->state == DS_KEY && !ctx->key_sz && !ctx->val_sz)) + { + return APR_SUCCESS; + } + + return SERF_ERROR_TRUNCATED_STREAM; + } + + return status; +} + +static void fcgi_serialize(serf_bucket_t *bucket) +{ + fcgi_params_decode_ctx_t *ctx = bucket->data; + serf_bucket_t *tmp; + + serf_bucket_aggregate_become(bucket); + + if (ctx->method || ctx->path) { + if (ctx->method) { + tmp = serf_bucket_simple_own_create(ctx->method, strlen(ctx->method), + bucket->allocator); + } + else + tmp = SERF_BUCKET_SIMPLE_STRING("GET", bucket->allocator); + serf_bucket_aggregate_append(bucket, tmp); + + tmp = SERF_BUCKET_SIMPLE_STRING(" ", bucket->allocator); + serf_bucket_aggregate_append(bucket, tmp); + + if (ctx->path) { + tmp = serf_bucket_simple_own_create(ctx->path, strlen(ctx->path), + bucket->allocator); + } + else + tmp = SERF_BUCKET_SIMPLE_STRING("/", bucket->allocator); + serf_bucket_aggregate_append(bucket, tmp); + + tmp = SERF_BUCKET_SIMPLE_STRING(" HTTP/2.0\r\n", bucket->allocator); + serf_bucket_aggregate_append(bucket, tmp); + } + + if (ctx->headers) + serf_bucket_aggregate_append(bucket, ctx->headers); + + if (ctx->key) + serf_bucket_mem_free(bucket->allocator, ctx->key); + if (ctx->val) + serf_bucket_mem_free(bucket->allocator, ctx->val); + + serf_bucket_mem_free(bucket->allocator, ctx); +} + +static apr_status_t fcgi_params_decode_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, + apr_size_t *len) +{ + apr_status_t status; + + status = fcgi_params_decode(bucket); + + if (status) { + *len = 0; + return status; + } + + fcgi_serialize(bucket); + return bucket->type->read(bucket, requested, data, len); +} + +static apr_status_t fcgi_params_decode_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + apr_status_t status; + status = fcgi_params_decode(bucket); + if (status) { + *len = 0; + return status; + } + + fcgi_serialize(bucket); + return bucket->type->peek(bucket, data, len); +} + +static void fcgi_params_decode_destroy(serf_bucket_t *bucket) +{ + fcgi_serialize(bucket); + + bucket->type->destroy(bucket); +} + + +extern const serf_bucket_type_t serf_bucket_type__fcgi_params_decode = +{ + "FCGI-PARAMS_DECODE", + fcgi_params_decode_read, + serf_default_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_buckets_are_v2, + fcgi_params_decode_peek, + fcgi_params_decode_destroy, + serf_default_read_bucket, + serf_default_get_remaining, + serf_default_ignore_config +}; /* ==================================================================== */ serf_bucket_t * @@ -327,3 +665,4 @@ extern const serf_bucket_type_t serf_buc serf_fcgi_frame_get_remaining, serf_fcgi_frame_set_config }; + Modified: serf/trunk/protocols/fcgi_buckets.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_buckets.h?rev=1714774&r1=1714773&r2=1714774&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_buckets.h (original) +++ serf/trunk/protocols/fcgi_buckets.h Tue Nov 17 12:49:11 2015 @@ -66,6 +66,15 @@ apr_status_t serf__bucket_fcgi_unframe_r apr_uint16_t *frame_type); /* ==================================================================== */ +extern const serf_bucket_type_t serf_bucket_type__fcgi_params_decode; +#define SERF__BUCKET_IS_FCGI_PARAMS_DECODE(b) \ + SERF_BUCKET_CHECK((b), _fcgi_params_decode) + +serf_bucket_t * +serf__bucket_fcgi_params_decode_create(serf_bucket_t *stream, + serf_bucket_alloc_t *alloc); + +/* ==================================================================== */ extern const serf_bucket_type_t serf_bucket_type__fcgi_frame; #define SERF__BUCKET_IS_FCGI_FRAME(b) SERF_BUCKET_CHECK((b), _fcgi_frame) Modified: serf/trunk/protocols/fcgi_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714774&r1=1714773&r2=1714774&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.c (original) +++ serf/trunk/protocols/fcgi_protocol.c Tue Nov 17 12:49:11 2015 @@ -31,6 +31,9 @@ #include "protocols/fcgi_buckets.h" #include "protocols/fcgi_protocol.h" +#define SERF_ERROR_FCGI_RECORD_SIZE_ERROR SERF_ERROR_HTTP2_FRAME_SIZE_ERROR +#define SERF_ERROR_FCGI_PROTOCOL_ERROR SERF_ERROR_HTTP2_PROTOCOL_ERROR + typedef struct serf_fcgi_protocol_t { serf_context_t *ctx; @@ -53,6 +56,10 @@ typedef struct serf_fcgi_protocol_t serf_bucket_t *read_frame; bool in_frame; + serf_fcgi_stream_t *first, *last; + + bool no_keep_conn; + } serf_fcgi_protocol_t; static apr_status_t fcgi_cleanup(void *baton) @@ -64,6 +71,31 @@ static apr_status_t fcgi_cleanup(void *b return APR_SUCCESS; } +/* Implements serf_bucket_prefix_handler_t. + Handles PING frames for pings initiated locally */ +static apr_status_t fcgi_begin_request(void *baton, + serf_bucket_t *bucket, + const char *data, + apr_size_t len) +{ + serf_fcgi_stream_t *stream = baton; + const FCGI_BeginRequestBody *brb; + + if (len != sizeof(*brb)) + return SERF_ERROR_FCGI_RECORD_SIZE_ERROR; + + brb = (const void*)data; + + stream->role = (brb->roleB1 << 8) | (brb->roleB0); + + if (!(brb->flags & FCGI_KEEP_CONN)) + stream->fcgi->no_keep_conn = true; + + + return APR_SUCCESS; +} + + /* Implements the serf_bucket_end_of_frame_t callback */ static apr_status_t fcgi_end_of_frame(void *baton, @@ -164,7 +196,6 @@ static apr_status_t fcgi_process(serf_fc void *process_baton = NULL; serf_bucket_t *process_bucket = NULL; serf_fcgi_stream_t *stream; - apr_uint32_t reset_reason; status = serf__bucket_fcgi_unframe_read_info(body, &sid, &frametype); @@ -195,6 +226,25 @@ static apr_status_t fcgi_process(serf_fc switch (frametype) { case FCGI_FRAMETYPE(FCGI_V1, FCGI_BEGIN_REQUEST): + stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE); + + if (stream) { + /* Stream must be new */ + return SERF_ERROR_FCGI_PROTOCOL_ERROR; + } + stream = serf_fcgi__stream_get(fcgi, sid, TRUE, TRUE); + + remaining = (apr_size_t)serf_bucket_get_remaining(body); + if (remaining != sizeof(FCGI_BeginRequestBody)) { + return SERF_ERROR_FCGI_RECORD_SIZE_ERROR; + } + body = serf_bucket_prefix_create( + body, + sizeof(FCGI_BeginRequestBody), + fcgi_begin_request, stream, + fcgi->allocator); + + /* Just reading will handle this frame now*/ process_bucket = body; break; case FCGI_FRAMETYPE(FCGI_V1, FCGI_ABORT_REQUEST): @@ -204,10 +254,44 @@ static apr_status_t fcgi_process(serf_fc process_bucket = body; break; case FCGI_FRAMETYPE(FCGI_V1, FCGI_PARAMS): - process_bucket = body; + stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE); + if (!stream) { + return SERF_ERROR_FCGI_PROTOCOL_ERROR; + } + + body = serf_fcgi__stream_handle_params(stream, body, + fcgi->allocator); + + if (body) { + /* We will take care of discarding */ + process_bucket = body; + } + else + { + /* The stream wants to handle the reading itself */ + process_handler = serf_fcgi__stream_processor; + process_baton = stream; + } break; case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDIN): - process_bucket = body; + stream = serf_fcgi__stream_get(fcgi, sid, FALSE, FALSE); + if (!stream) { + return SERF_ERROR_FCGI_PROTOCOL_ERROR; + } + + body = serf_fcgi__stream_handle_stdin(stream, body, + fcgi->allocator); + + if (body) { + /* We will take care of discarding */ + process_bucket = body; + } + else + { + /* The stream wants to handle the reading itself */ + process_handler = serf_fcgi__stream_processor; + process_baton = stream; + } break; case FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT): process_bucket = body; @@ -263,6 +347,11 @@ static apr_status_t fcgi_read(serf_fcgi_ static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi) { + return APR_SUCCESS; +} + +static apr_status_t fcgi_hangup(serf_fcgi_protocol_t *fcgi) +{ return APR_ENOTIMPL; } @@ -271,6 +360,52 @@ static apr_status_t fcgi_teardown(serf_f return APR_ENOTIMPL; } +static void +move_to_head(serf_fcgi_stream_t *stream) +{ + /* Not implemented yet */ +} + +serf_fcgi_stream_t * +serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi, + apr_uint16_t streamid, + bool create, + bool move_first) +{ + serf_fcgi_stream_t *stream; + + if (streamid == 0) + return NULL; + + for (stream = fcgi->first; stream; stream = stream->next) + { + if (stream->streamid == streamid) + { + if (move_first && stream != fcgi->first) + move_to_head(stream); + + return stream; + } + } + + if (create) + { + stream = serf_fcgi__stream_create(fcgi, streamid, fcgi->allocator); + + if (fcgi->first) + { + stream->next = fcgi->first; + fcgi->first->prev = stream; + fcgi->first = stream; + } + else + fcgi->last = fcgi->first = stream; + + return stream; + } + return NULL; +} + /* --------------- connection support --------------- */ static apr_status_t fcgi_outgoing_read(serf_connection_t *conn) @@ -287,21 +422,21 @@ static apr_status_t fcgi_outgoing_write( { serf_fcgi_protocol_t *fcgi = conn->protocol_baton; - return fcgi_read(fcgi); + return fcgi_teardown(fcgi); } static apr_status_t fcgi_outgoing_hangup(serf_connection_t *conn) { serf_fcgi_protocol_t *fcgi = conn->protocol_baton; - return fcgi_read(fcgi); + return fcgi_teardown(fcgi); } static apr_status_t fcgi_outgoing_teardown(serf_connection_t *conn) { serf_fcgi_protocol_t *fcgi = conn->protocol_baton; - return fcgi_read(fcgi); + return fcgi_teardown(fcgi); } void serf__fcgi_protocol_init(serf_connection_t *conn) @@ -353,21 +488,26 @@ static apr_status_t fcgi_server_write(se { serf_fcgi_protocol_t *fcgi = client->protocol_baton; - return fcgi_read(fcgi); + if (!fcgi->stream) { + fcgi->stream = client->stream; + fcgi->ostream = client->ostream_tail; + } + + return fcgi_write(fcgi); } static apr_status_t fcgi_server_hangup(serf_incoming_t *client) { serf_fcgi_protocol_t *fcgi = client->protocol_baton; - return fcgi_read(fcgi); + return fcgi_hangup(fcgi); } static apr_status_t fcgi_server_teardown(serf_incoming_t *client) { serf_fcgi_protocol_t *fcgi = client->protocol_baton; - return fcgi_read(fcgi); + return fcgi_teardown(fcgi); } void serf__fcgi_protocol_init_server(serf_incoming_t *client) Modified: serf/trunk/protocols/fcgi_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714774&r1=1714773&r2=1714774&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.h (original) +++ serf/trunk/protocols/fcgi_protocol.h Tue Nov 17 12:49:11 2015 @@ -46,6 +46,37 @@ typedef struct serf_fcgi_stream_data_t s #define FCGI_V1 0x1 +/* From protocol specs */ +/* +* Listening socket file number +*/ +#define FCGI_LISTENSOCK_FILENO 0 + +typedef struct FCGI_Header { + unsigned char version; + unsigned char type; + unsigned char requestIdB1; + unsigned char requestIdB0; + unsigned char contentLengthB1; + unsigned char contentLengthB0; + unsigned char paddingLength; + unsigned char reserved; +} FCGI_Header; + +/* + * Number of bytes in a FCGI_Header. Future versions of the protocol + * will not reduce this number. + */ +#define FCGI_HEADER_LEN 8 + +/* + * Value for version component of FCGI_Header + */ +#define FCGI_VERSION_1 1 + +/* + * Values for type component of FCGI_Header + */ #define FCGI_BEGIN_REQUEST 1 #define FCGI_ABORT_REQUEST 2 #define FCGI_END_REQUEST 3 @@ -59,12 +90,85 @@ typedef struct serf_fcgi_stream_data_t s #define FCGI_UNKNOWN_TYPE 11 #define FCGI_MAXTYPE (FCGI_UNKNOWN_TYPE) +/* +* Value for requestId component of FCGI_Header +*/ +#define FCGI_NULL_REQUEST_ID 0 + +typedef struct FCGI_BeginRequestBody { + unsigned char roleB1; + unsigned char roleB0; + unsigned char flags; + unsigned char reserved[5]; +} FCGI_BeginRequestBody; + +typedef struct FCGI_BeginRequestRecord { + FCGI_Header header; + FCGI_BeginRequestBody body; +} FCGI_BeginRequestRecord; + +/* + * Mask for flags component of FCGI_BeginRequestBody + */ +#define FCGI_KEEP_CONN 1 + +/* + * Values for role component of FCGI_BeginRequestBody + */ +#define FCGI_RESPONDER 1 +#define FCGI_AUTHORIZER 2 +#define FCGI_FILTER 3 + +typedef struct FCGI_EndRequestBody { + unsigned char appStatusB3; + unsigned char appStatusB2; + unsigned char appStatusB1; + unsigned char appStatusB0; + unsigned char protocolStatus; + unsigned char reserved[3]; +} FCGI_EndRequestBody; + +typedef struct FCGI_EndRequestRecord { + FCGI_Header header; + FCGI_EndRequestBody body; +} FCGI_EndRequestRecord; + +/* +* Values for protocolStatus component of FCGI_EndRequestBody +*/ +#define FCGI_REQUEST_COMPLETE 0 +#define FCGI_CANT_MPX_CONN 1 +#define FCGI_OVERLOADED 2 +#define FCGI_UNKNOWN_ROLE 3 + +/* +* Variable names for FCGI_GET_VALUES / FCGI_GET_VALUES_RESULT records +*/ +#define FCGI_MAX_CONNS "FCGI_MAX_CONNS" +#define FCGI_MAX_REQS "FCGI_MAX_REQS" +#define FCGI_MPXS_CONNS "FCGI_MPXS_CONNS" + +typedef struct FCGI_UnknownTypeBody { + unsigned char type; + unsigned char reserved[7]; +} FCGI_UnknownTypeBody; + +typedef struct FCGI_UnknownTypeRecord { + FCGI_Header header; + FCGI_UnknownTypeBody body; +} FCGI_UnknownTypeRecord; + + +/**************************************************/ typedef struct serf_fcgi_stream_t { - struct serf_fcgi_protocol_t *h2; + struct serf_fcgi_protocol_t *fcgi; serf_bucket_alloc_t *alloc; + apr_uint16_t streamid; + apr_uint16_t role; + /* Opaque implementation details */ serf_fcgi_stream_data_t *data; @@ -78,6 +182,31 @@ typedef apr_status_t(*serf_fcgi_processo serf_bucket_t *body); +/* From fcgi_protocol.c */ +serf_fcgi_stream_t * serf_fcgi__stream_get(serf_fcgi_protocol_t *fcgi, + apr_uint16_t streamid, + bool create, + bool move_first); + + +/* From fcgi_stream.c */ +serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi, + apr_uint16_t streamid, + serf_bucket_alloc_t *alloc); + +apr_status_t serf_fcgi__stream_processor(void *baton, + serf_fcgi_protocol_t *fcgi, + serf_bucket_t *body); + +serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream, + serf_bucket_t *body, + serf_bucket_alloc_t *alloc); + +serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream, + serf_bucket_t *body, + serf_bucket_alloc_t *alloc); + + #ifdef __cplusplus } Modified: serf/trunk/protocols/fcgi_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714774&r1=1714773&r2=1714774&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_stream.c (original) +++ serf/trunk/protocols/fcgi_stream.c Tue Nov 17 12:49:11 2015 @@ -32,26 +32,167 @@ struct serf_fcgi_stream_data_t { - int dummy; + serf_bucket_t *req_agg; + bool headers_eof; + bool stdin_eof; + + serf_request_t *request; + serf_incoming_request_t *in_request; }; -serf_fcgi_stream_t * -serf_fcgi__stream_create(serf_fcgi_protocol_t *h2, - apr_int32_t streamid, - apr_uint32_t lr_window, - apr_uint32_t rl_window, - serf_bucket_alloc_t *alloc) +serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi, + apr_uint16_t streamid, + serf_bucket_alloc_t *alloc) { serf_fcgi_stream_t *stream = serf_bucket_mem_alloc(alloc, sizeof(*stream)); - stream->h2 = h2; + stream->fcgi = fcgi; stream->alloc = alloc; + stream->streamid = streamid; stream->next = stream->prev = NULL; /* Delay creating this? */ - stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data)); + stream->data = serf_bucket_mem_calloc(alloc, sizeof(*stream->data)); return stream; } + +/* Aggregate hold open callback for what requests will think is the + actual body */ +static apr_status_t stream_agg_eof(void *baton, + serf_bucket_t *bucket) +{ + serf_fcgi_stream_t *stream = baton; + + if (!stream->data->stdin_eof) + return APR_EAGAIN; + + return APR_EOF; +} + +serf_bucket_t * serf_fcgi__stream_handle_params(serf_fcgi_stream_t *stream, + serf_bucket_t *body, + serf_bucket_alloc_t *alloc) +{ + apr_size_t remaining; + if (!stream->data->req_agg) { + stream->data->req_agg = serf_bucket_aggregate_create(stream->alloc); + + serf_bucket_aggregate_hold_open(stream->data->req_agg, + stream_agg_eof, stream); + } + + remaining = (apr_size_t)serf_bucket_get_remaining(body); + + if (remaining) { + body = serf__bucket_fcgi_params_decode_create(body, body->allocator); + } + else { + stream->data->headers_eof = true; + } + + /* And add it to our aggregate in both cases */ + serf_bucket_aggregate_append(stream->data->req_agg, body); + + return NULL; +} + +serf_bucket_t * serf_fcgi__stream_handle_stdin(serf_fcgi_stream_t *stream, + serf_bucket_t *body, + serf_bucket_alloc_t *alloc) +{ + apr_size_t remaining; + SERF_FCGI_assert(stream->data->headers_eof); + if (!stream->data->req_agg) { + stream->data->req_agg = serf_bucket_aggregate_create(stream->alloc); + + serf_bucket_aggregate_hold_open(stream->data->req_agg, + stream_agg_eof, stream); + } + + remaining = (apr_size_t)serf_bucket_get_remaining(body); + + if (!remaining) { + stream->data->stdin_eof = true; + } + + /* And add it to our aggregate in both cases */ + serf_bucket_aggregate_append(stream->data->req_agg, body); + + return NULL; +} + + +apr_status_t serf_fcgi__stream_processor(void *baton, + serf_fcgi_protocol_t *fcgi, + serf_bucket_t *body) +{ + serf_fcgi_stream_t *stream = baton; + apr_status_t status = APR_SUCCESS; + + SERF_FCGI_assert(stream->data->req_agg != NULL); + + if (stream->data->request) { + /* ### TODO */ + } + else if (stream->data->in_request) { + serf_incoming_request_t *request = stream->data->in_request; + + SERF_FCGI_assert(request->req_bkt != NULL); + + status = request->handler(request, request->req_bkt, + request->handler_baton, + request->pool); + + if (!APR_STATUS_IS_EOF(status) + && !SERF_BUCKET_READ_ERROR(status)) + return status; + + if (APR_STATUS_IS_EOF(status)) { + status = serf_incoming_response_create(request); + + if (status) + return status; + } + + if (SERF_BUCKET_READ_ERROR(status)) { + + /* SEND ERROR status */ + + return status; + } + } + + while (!status) + { + struct iovec vecs[IOV_MAX]; + int vecs_used; + + /* Drain the bucket as efficiently as possible */ + status = serf_bucket_read_iovec(stream->data->req_agg, + SERF_READ_ALL_AVAIL, + IOV_MAX, vecs, &vecs_used); + + if (vecs_used) { + /* We have data... What should we do with it? */ + /*int i; + + for (i = 0; i < vecs_used; i++) + fprintf(stderr, "%.*s", vecs[i].iov_len, vecs[i].iov_base);*/ + } + } + + if (APR_STATUS_IS_EOF(status)) + { + /* If there was a request, it is already gone, so we can now safely + destroy our aggregate which may include everything upto the http2 + frames */ + serf_bucket_destroy(stream->data->req_agg); + stream->data->req_agg = NULL; + } + + return status; +} +