Author: rhuijben Date: Tue Nov 17 16:07:48 2015 New Revision: 1714818 URL: http://svn.apache.org/viewvc?rev=1714818&view=rev Log: Start sending a response back to the fgci server. This will actually make the result visible in a webbrowser when using a httpd configured with
ProxyPass "/d/fcgi/" "fcgi://127.0.0.1:4000/" enablereuse=on And a serf_httpd run as $ serf_httpd --listen fcgi,172.24.0.11:4000 /my/root * buckets/fcgi_buckets.c (fcgi_frame_ctx_t): New struct. (serf__bucket_fcgi_frame_create): Extend arguments. (serf_fcgi_frame_refill): New function. (serf_fcgi_frame_read, serf_fcgi_frame_read_iovec, serf_fcgi_frame_peek, serf_fcgi_frame_destroy, serf_fcgi_frame_get_remaining, serf_fcgi_frame_set_config): Implement. * protocols/fcgi_buckets.h (serf__bucket_fcgi_frame_create): Add two arguments. * protocols/fcgi_protocol.c (serf_fcgi__enqueue_frame): New function. Based on http2 version. (fcgi_write): Implement. * protocols/fcgi_protocol.h (serf_fcgi__enqueue_frame): New function. * protocols/fcgi_stream.c (includes): Add apr_date.h. (fcgi_stream_enqueue_response): Create proper response. * test/serf_httpd.c (request_generate_response): Remove chunked encoding from http/1 testing. This doesn't belong on this layer. Modified: serf/trunk/buckets/fcgi_buckets.c serf/trunk/protocols/fcgi_buckets.h serf/trunk/protocols/fcgi_protocol.c serf/trunk/protocols/fcgi_protocol.h serf/trunk/protocols/fcgi_stream.c serf/trunk/test/serf_httpd.c Modified: serf/trunk/buckets/fcgi_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/fcgi_buckets.c?rev=1714818&r1=1714817&r2=1714818&view=diff ============================================================================== --- serf/trunk/buckets/fcgi_buckets.c (original) +++ serf/trunk/buckets/fcgi_buckets.c Tue Nov 17 16:07:48 2015 @@ -600,23 +600,165 @@ extern const serf_bucket_type_t serf_buc serf_default_ignore_config }; /* ==================================================================== */ +typedef struct fcgi_frame_ctx_t +{ + serf_bucket_t *stream; + serf_bucket_t *agg; + apr_uint16_t stream_id; + apr_uint16_t frame_type; + bool send_stream; + bool send_eof; + bool at_eof; + + char record_data[FCGI_RECORD_SIZE]; +} fcgi_frame_ctx_t; + serf_bucket_t * serf__bucket_fcgi_frame_create(serf_bucket_t *stream, apr_uint16_t stream_id, apr_uint16_t frame_type, + bool send_as_stream, + bool send_eof, serf_bucket_alloc_t *alloc) { - return NULL; + fcgi_frame_ctx_t *ctx; + + ctx = serf_bucket_mem_alloc(alloc, sizeof(*ctx)); + ctx->stream = stream; + ctx->stream_id = stream_id; + ctx->frame_type = frame_type; + ctx->send_stream = send_as_stream; + ctx->send_eof = send_eof; + ctx->at_eof = false; + ctx->agg = NULL; + + return serf_bucket_create(&serf_bucket_type__fcgi_frame, alloc, ctx); } +static apr_status_t serf_fcgi_frame_refill(serf_bucket_t *bucket) +{ + fcgi_frame_ctx_t *ctx = bucket->data; + apr_status_t status; + serf_bucket_t *head = NULL; + apr_size_t payload; + + if (ctx->at_eof) + return APR_EOF; + + if (!ctx->agg) + ctx->agg = serf_bucket_aggregate_create(bucket->allocator); + + if (!ctx->stream) + { + payload = 0; + } + else if (ctx->send_stream) + { + apr_uint64_t remaining; + serf_bucket_split_create(&head, &ctx->stream, ctx->stream, + 8192, 0xFFF0); /* Some guesses */ + + remaining = serf_bucket_get_remaining(head); + if (remaining != SERF_LENGTH_UNKNOWN) { + serf_bucket_aggregate_append(ctx->agg, head); + payload = (apr_size_t)remaining; + } + else if (remaining == 0) + { + payload = 0; + serf_bucket_destroy(head); + ctx->at_eof = true; + } + else + { + struct iovec vecs[IOV_MAX]; + int vecs_used; + + status = serf_bucket_read_iovec(head, 0xFFF0, + IOV_MAX, vecs, &vecs_used); + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + + if (vecs_used) { + serf_bucket_aggregate_append_iovec(ctx->agg, vecs, vecs_used); + payload = (apr_size_t)serf_bucket_get_remaining(ctx->agg); + + if (APR_STATUS_IS_EOF(status)) { + /* Keep head alive by appending it to the aggregate */ + serf_bucket_aggregate_append(ctx->agg, head); + } + else { + /* Write a new record for the remaining part of head :( */ + serf_bucket_aggregate_append(ctx->agg, + serf__bucket_fcgi_frame_create(head, ctx->stream_id, + ctx->frame_type, + true /* send_as_stream */, + false /* send_eof */, + bucket->allocator)); + } + } + else + payload = 0; + + if (APR_STATUS_IS_EOF(status)) + ctx->at_eof = true; + } + + if (!payload && !ctx->send_eof) + return APR_SUCCESS; + } + else + { + abort(); /* ### TODO */ + } + + /* Create FCGI record */ + ctx->record_data[0] = (ctx->frame_type >> 8); + ctx->record_data[1] = (ctx->frame_type & 0xFF); + ctx->record_data[2] = (ctx->stream_id >> 8); + ctx->record_data[3] = (ctx->stream_id & 0xFF); + ctx->record_data[4] = (payload >> 8) & 0xFF; + ctx->record_data[5] = (payload & 0xFF); + ctx->record_data[6] = 0; /* padding */ + ctx->record_data[7] = 0; /* reserved */ + + serf_bucket_aggregate_prepend(ctx->agg, + serf_bucket_simple_create(ctx->record_data, + FCGI_RECORD_SIZE, + NULL, NULL, + bucket->allocator)); + return APR_SUCCESS; +} static apr_status_t serf_fcgi_frame_read(serf_bucket_t *bucket, apr_size_t requested, const char **data, apr_size_t *len) { - return APR_ENOTIMPL; + fcgi_frame_ctx_t *ctx = bucket->data; + apr_status_t status; + + if (ctx->agg) { + status = serf_bucket_read(ctx->agg, requested, data, len); + + if (!APR_STATUS_IS_EOF(status)) + return status; + } + + status = serf_fcgi_frame_refill(bucket); + if (status) { + *len = 0; + return status; + } + + status = serf_bucket_read(ctx->agg, requested, data, len); + + if (APR_STATUS_IS_EOF(status) && !ctx->at_eof) + status = APR_SUCCESS; + + return status; } static apr_status_t serf_fcgi_frame_read_iovec(serf_bucket_t *bucket, @@ -625,30 +767,89 @@ static apr_status_t serf_fcgi_frame_read struct iovec *vecs, int *vecs_used) { - return APR_ENOTIMPL; + fcgi_frame_ctx_t *ctx = bucket->data; + apr_status_t status; + + if (ctx->agg) { + status = serf_bucket_read_iovec(ctx->agg, requested, vecs_size, + vecs, vecs_used); + + if (!APR_STATUS_IS_EOF(status)) + return status; + } + + status = serf_fcgi_frame_refill(bucket); + if (status) { + *vecs_used = 0; + return status; + } + + status = serf_bucket_read_iovec(ctx->agg, requested, vecs_size, + vecs, vecs_used); + + if (APR_STATUS_IS_EOF(status) && !ctx->at_eof) + status = APR_SUCCESS; + + return status; } static apr_status_t serf_fcgi_frame_peek(serf_bucket_t *bucket, const char **data, apr_size_t *len) { - return APR_ENOTIMPL; + fcgi_frame_ctx_t *ctx = bucket->data; + apr_status_t status; + + if (ctx->agg) { + status = serf_bucket_peek(ctx->agg, data, len); + + if (!APR_STATUS_IS_EOF(status)) + return status; + } + + status = serf_fcgi_frame_refill(bucket); + if (status) { + *len = 0; + return status; + } + + status = serf_bucket_peek(ctx->agg, data, len); + + if (APR_STATUS_IS_EOF(status) && !ctx->at_eof) + status = APR_SUCCESS; + + return status; } static void serf_fcgi_frame_destroy(serf_bucket_t *bucket) { + fcgi_frame_ctx_t *ctx = bucket->data; + + if (ctx->agg) + serf_bucket_destroy(ctx->agg); + if (ctx->stream) + serf_bucket_destroy(ctx->stream); + serf_default_destroy_and_data(bucket); } static apr_uint64_t serf_fcgi_frame_get_remaining(serf_bucket_t *bucket) { - return APR_ENOTIMPL; + return SERF_LENGTH_UNKNOWN; } static apr_status_t serf_fcgi_frame_set_config(serf_bucket_t *bucket, - serf_config_t *config) + serf_config_t *config) { - return APR_ENOTIMPL; + fcgi_frame_ctx_t *ctx = bucket->data; + + if (!ctx->agg) + ctx->agg = serf_bucket_aggregate_create(bucket->allocator); + serf_bucket_set_config(ctx->agg, config); + if (ctx->stream) + serf_bucket_set_config(ctx->stream, config); + + return APR_SUCCESS; } extern const serf_bucket_type_t serf_bucket_type__fcgi_frame = Modified: serf/trunk/protocols/fcgi_buckets.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_buckets.h?rev=1714818&r1=1714817&r2=1714818&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_buckets.h (original) +++ serf/trunk/protocols/fcgi_buckets.h Tue Nov 17 16:07:48 2015 @@ -83,6 +83,8 @@ serf_bucket_t * serf__bucket_fcgi_frame_create(serf_bucket_t *stream, apr_uint16_t stream_id, apr_uint16_t frame_type, + bool send_as_stream, + bool send_eof, serf_bucket_alloc_t *alloc); #ifdef __cplusplus Modified: serf/trunk/protocols/fcgi_protocol.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1714818&r1=1714817&r2=1714818&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.c (original) +++ serf/trunk/protocols/fcgi_protocol.c Tue Nov 17 16:07:48 2015 @@ -347,8 +347,76 @@ static apr_status_t fcgi_read(serf_fcgi_ return APR_SUCCESS; } +apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi, + serf_bucket_t *frame, + bool pump) +{ + apr_status_t status; + bool want_write; + + if (!pump && !*fcgi->dirty_pollset) + { + const char *data; + apr_size_t len; + + /* Cheap check to see if we should request a write + event next time around */ + status = serf_bucket_peek(fcgi->ostream, &data, &len); + + if (SERF_BUCKET_READ_ERROR(status)) + { + serf_bucket_destroy(frame); + return status; + } + + if (len == 0) + { + *fcgi->dirty_pollset = true; + fcgi->ctx->dirty_pollset = true; + } + } + + serf_bucket_aggregate_append(fcgi->ostream, frame); + + if (!pump) + return APR_SUCCESS; + + /* Flush final output buffer (after ssl, etc.) */ + if (fcgi->conn) + status = serf__connection_flush(fcgi->conn, TRUE); + else + status = serf__incoming_client_flush(fcgi->client, TRUE); + + want_write = APR_STATUS_IS_EAGAIN(status); + + if ((want_write && !(*fcgi->req_events & APR_POLLOUT)) + || (!want_write && (*fcgi->req_events & APR_POLLOUT))) + { + *fcgi->dirty_pollset = true; + fcgi->ctx->dirty_pollset = true; + } + + return status; +} + static apr_status_t fcgi_write(serf_fcgi_protocol_t *fcgi) { + apr_status_t status; + + if (fcgi->client) + status = serf__incoming_client_flush(fcgi->client, true); + else + status = serf__connection_flush(fcgi->conn, true); + + if (APR_STATUS_IS_EAGAIN(status)) + return APR_SUCCESS; + else if (status) + return status; + + /* Probably nothing to write. */ + *fcgi->dirty_pollset = true; + fcgi->ctx->dirty_pollset = true; + return APR_SUCCESS; } Modified: serf/trunk/protocols/fcgi_protocol.h URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.h?rev=1714818&r1=1714817&r2=1714818&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_protocol.h (original) +++ serf/trunk/protocols/fcgi_protocol.h Tue Nov 17 16:07:48 2015 @@ -195,6 +195,10 @@ apr_status_t serf_fcgi__setup_incoming_r void **req_setup_baton, serf_fcgi_protocol_t *fcgi); +apr_status_t serf_fcgi__enqueue_frame(serf_fcgi_protocol_t *fcgi, + serf_bucket_t *frame, + bool pump); + /* From fcgi_stream.c */ serf_fcgi_stream_t * serf_fcgi__stream_create(serf_fcgi_protocol_t *fcgi, Modified: serf/trunk/protocols/fcgi_stream.c URL: http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_stream.c?rev=1714818&r1=1714817&r2=1714818&view=diff ============================================================================== --- serf/trunk/protocols/fcgi_stream.c (original) +++ serf/trunk/protocols/fcgi_stream.c Tue Nov 17 16:07:48 2015 @@ -22,6 +22,7 @@ #include <apr_pools.h> #include <apr_strings.h> +#include <apr_date.h> #include "serf.h" #include "serf_bucket_util.h" @@ -77,12 +78,59 @@ fcgi_stream_enqueue_response(serf_incomi void *enqueue_baton, serf_bucket_t *response_bkt) { - return APR_SUCCESS; + serf_fcgi_stream_t *stream = enqueue_baton; + serf_bucket_alloc_t *alloc = response_bkt->allocator; + serf_linebuf_t *linebuf; + serf_bucket_t *agg; + serf_bucket_t *tmp; + apr_status_t status; + + /* With FCGI we don't send the usual first line of the response. + We just send a "Status: 200" instead and the actual http + server will handle the rest */ + agg = serf_bucket_aggregate_create(alloc); + + /* Too big for the stack :( */ + linebuf = serf_bucket_mem_alloc(alloc, sizeof(*linebuf)); + serf_linebuf_init(linebuf); + + do + { + status = serf_linebuf_fetch(linebuf, response_bkt, SERF_NEWLINE_ANY); + } while (status == APR_SUCCESS && linebuf->state != SERF_LINEBUF_READY); + + if (status + || linebuf->state != SERF_LINEBUF_READY + || !apr_date_checkmask(linebuf->line, "HTTP/#.# ###*")) + { + /* We can't write a response in this state yet :( */ + serf_bucket_mem_free(alloc, linebuf); + return status; + } + + tmp = SERF_BUCKET_SIMPLE_STRING("Status: ", alloc); + serf_bucket_aggregate_append(agg, tmp); + + tmp = serf_bucket_simple_copy_create(linebuf->line + 9, 3, alloc); + serf_bucket_aggregate_append(agg, tmp); + serf_bucket_mem_free(alloc, linebuf); + + tmp = SERF_BUCKET_SIMPLE_STRING("\r\n", alloc); + serf_bucket_aggregate_append(agg, tmp); + + serf_bucket_aggregate_append(agg, response_bkt); + + return serf_fcgi__enqueue_frame( + stream->fcgi, + serf__bucket_fcgi_frame_create(agg, stream->streamid, + FCGI_FRAMETYPE(FCGI_V1, FCGI_STDOUT), + true, true, + alloc), true); } static apr_status_t stream_setup_request(serf_fcgi_stream_t *stream, - serf_config_t *config) + serf_config_t *config) { serf_bucket_t *agg; apr_status_t status; Modified: serf/trunk/test/serf_httpd.c URL: http://svn.apache.org/viewvc/serf/trunk/test/serf_httpd.c?rev=1714818&r1=1714817&r2=1714818&view=diff ============================================================================== --- serf/trunk/test/serf_httpd.c (original) +++ serf/trunk/test/serf_httpd.c Tue Nov 17 16:07:48 2015 @@ -141,9 +141,6 @@ static apr_status_t request_generate_res tmp = SERF_BUCKET_SIMPLE_STRING("HTTP/1.1 200 OK" CRLF, alloc); serf_bucket_aggregate_append(agg, tmp); - tmp = SERF_BUCKET_SIMPLE_STRING("Transfer-Encoding: chunked" CRLF, alloc); - serf_bucket_aggregate_append(agg, tmp); - tmp = SERF_BUCKET_SIMPLE_STRING("Content-Type: text/plain" CRLF, alloc); serf_bucket_aggregate_append(agg, tmp); @@ -173,7 +170,7 @@ static apr_status_t request_generate_res tmp = SERF_BUCKET_SIMPLE_STRING(CRLF, alloc); serf_bucket_aggregate_append(body, tmp); - tmp = serf_bucket_chunk_create(body, alloc); + tmp = body; serf_bucket_aggregate_append(agg, tmp); tmp = SERF_BUCKET_SIMPLE_STRING(CRLF, alloc);