Author: rhuijben
Date: Mon Nov 16 10:32:49 2015
New Revision: 1714539
URL: http://svn.apache.org/viewvc?rev=1714539&view=rev
Log:
Add a basic 'incoming request bucket' that can read a HTTP/1 style requests
as it would be received on the server. This should be the 100% opposite of
the current request bucket, but currently it is still a bit limited.
(The next commit will actually use this on the server side).
* buckets/request_buckets.c
(incoming_rq_status_t): New enum.
(incoming_request_context_t): New struct.
(serf_bucket_incoming_request_create,
serf_incoming_rq_parse_rqline,
serf_incoming_rq_parse_headerline,
serf_incoming_rq_wait_for,
serf_incoming_rq_read,
serf_incoming_rq_readline,
serf_incoming_rq_read_iovec,
serf_incoming_rq_peek,
serf_incoming_rq_destroy,
serf_bucket_incoming_request_read,
serf_bucket_incoming_request_wait_for_headers): New function.
(serf_bucket_type_incoming_request): New bucket type.
* serf_bucket_types.h
(serf_bucket_type_incoming_request): New constant.
(SERF_BUCKET_IS_INCOMING_REQUEST): New define.
(serf_bucket_incoming_request_create,
serf_bucket_incoming_request_read,
serf_bucket_incoming_request_wait_for_headers): New function.
Modified:
serf/trunk/buckets/request_buckets.c
serf/trunk/serf_bucket_types.h
Modified: serf/trunk/buckets/request_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/buckets/request_buckets.c?rev=1714539&r1=1714538&r2=1714539&view=diff
==============================================================================
--- serf/trunk/buckets/request_buckets.c (original)
+++ serf/trunk/buckets/request_buckets.c Mon Nov 16 10:32:49 2015
@@ -276,3 +276,351 @@ const serf_bucket_type_t serf_bucket_typ
serf_request_set_config,
};
+typedef enum incoming_rq_status_t
+{
+ STATE_INIT,
+ STATE_HEADERS,
+ STATE_PREBODY,
+ STATE_BODY,
+ STATE_TRAILERS,
+ STATE_DONE
+} incoming_rq_status_t;
+
+typedef struct incoming_request_context_t {
+ const char *method;
+ const char *path_raw;
+ int version;
+
+ serf_bucket_t *stream;
+ serf_bucket_t *headers;
+ serf_bucket_t *body;
+
+ incoming_rq_status_t state;
+ bool expect_trailers;
+
+ /* Buffer for accumulating a line from the response. */
+ serf_linebuf_t linebuf;
+
+} incoming_request_context_t;
+
+serf_bucket_t *serf_bucket_incoming_request_create(
+ serf_bucket_t *stream,
+ serf_bucket_alloc_t *allocator)
+{
+ incoming_request_context_t *ctx;
+
+ ctx = serf_bucket_mem_calloc(allocator, sizeof(*ctx));
+
+ ctx->stream = stream;
+ ctx->state = STATE_INIT;
+ ctx->headers = serf_bucket_headers_create(allocator);
+ serf_linebuf_init(&ctx->linebuf);
+
+ return serf_bucket_create(&serf_bucket_type_incoming_request,
+ allocator, ctx);
+}
+
+static apr_status_t serf_incoming_rq_parse_rqline(serf_bucket_t *bucket)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ const char *spc, *spc2;
+
+ if (ctx->linebuf.used == 0) {
+ return SERF_ERROR_TRUNCATED_STREAM;
+ }
+
+ /* ### This may need some security review if this is used in production
+ code */
+ spc = memchr(ctx->linebuf.line, ' ', ctx->linebuf.used);
+
+ if (spc)
+ ctx->method = serf_bstrmemdup(bucket->allocator, ctx->linebuf.line,
+ spc - ctx->linebuf.line);
+ else
+ return SERF_ERROR_TRUNCATED_STREAM;
+
+ spc2 = memchr(spc + 1, ' ', ctx->linebuf.used - (ctx->linebuf.line - spc)
+ - 1);
+
+ if (spc2)
+ ctx->path_raw = serf_bstrmemdup(bucket->allocator, spc + 1,
+ (spc2 - spc-1));
+ else
+ return SERF_ERROR_TRUNCATED_STREAM;
+
+ ctx->version = SERF_HTTP_11; /* ### Parse! */
+ ctx->state++;
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_incoming_rq_parse_headerline(serf_bucket_t *bucket)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ const char *split;
+
+ if (ctx->linebuf.used == 0) {
+ ctx->state++;
+ return APR_SUCCESS;
+ }
+
+ split = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
+
+ serf_bucket_headers_setx(ctx->headers,
+ ctx->linebuf.line, (split - ctx->linebuf.line),
+ TRUE /* copy */,
+ split + 2,
+ ctx->linebuf.used - (split - ctx->linebuf.line) -
2,
+ TRUE /* copy */);
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t serf_incoming_rq_wait_for(serf_bucket_t *bucket,
+ incoming_rq_status_t wait_for)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ if (ctx->state == STATE_TRAILERS && wait_for == STATE_BODY) {
+ /* We are done with the body, but not with the request.
+ Can't return EOF yet */
+ wait_for = STATE_DONE;
+ }
+
+ while (ctx->state < wait_for) {
+ switch (ctx->state) {
+ case STATE_INIT:
+ status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream,
+ SERF_NEWLINE_ANY);
+ if (status)
+ return status;
+
+ status = serf_incoming_rq_parse_rqline(bucket);
+ if (status)
+ return status;
+ break;
+ case STATE_HEADERS:
+ case STATE_TRAILERS:
+ status = serf_linebuf_fetch(&ctx->linebuf, ctx->stream,
+ SERF_NEWLINE_ANY);
+ if (status)
+ return status;
+
+ status = serf_incoming_rq_parse_headerline(bucket);
+ if (status)
+ return status;
+ break;
+ case STATE_PREBODY:
+ /* TODO: Determine the body type.. Wrap bucket if necessary,
+ etc.*/
+
+ /* What kind of body do we expect */
+ {
+ const char *te;
+
+ ctx->body = ctx->stream;
+ te = serf_bucket_headers_get(ctx->headers,
"Transfer-Encoding");
+
+ if (te && strcasecmp(te, "chunked") == 0) {
+ ctx->body = serf_bucket_dechunk_create(ctx->stream,
+
bucket->allocator);
+ ctx->expect_trailers = true;
+ }
+ else {
+ const char *cl;
+
+ cl = serf_bucket_headers_get(ctx->headers,
"Content-Length");
+
+ if (cl) {
+ apr_uint64_t length;
+ length = apr_strtoi64(cl, NULL, 10);
+ if (errno == ERANGE) {
+ return APR_FROM_OS_ERROR(ERANGE);
+ }
+ ctx->body = serf_bucket_response_body_create(
+ ctx->body, length,
bucket->allocator);
+ }
+ }
+ ctx->state++;
+ }
+ break;
+ case STATE_DONE:
+ break;
+ default:
+ return APR_EGENERAL; /* Should never happen */
+ }
+ }
+
+ return (ctx->state == STATE_DONE) ? APR_EOF : APR_SUCCESS;
+}
+
+static apr_status_t serf_incoming_rq_read(serf_bucket_t *bucket,
+ apr_size_t requested,
+ const char **data,
+ apr_size_t *len)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+ if (status || !ctx->body) {
+ *len = 0;
+ return status ? status : APR_EOF;
+ }
+
+ status = serf_bucket_read(ctx->body, requested, data, len);
+ if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+ ctx->state = STATE_TRAILERS;
+ status = APR_SUCCESS;
+ }
+ return status;
+}
+
+static apr_status_t serf_incoming_rq_readline(serf_bucket_t *bucket, int
acceptable,
+ int *found,
+ const char **data, apr_size_t
*len)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+ if (status || !ctx->body) {
+ *found = 0;
+ *len = 0;
+ return status ? status : APR_EOF;
+ }
+
+ status = serf_bucket_readline(ctx->body, acceptable, found, data, len);
+ if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+ ctx->state = STATE_TRAILERS;
+ status = APR_SUCCESS;
+ }
+ return status;
+}
+
+static apr_status_t serf_incoming_rq_read_iovec(serf_bucket_t *bucket,
+ apr_size_t requested,
+ int vecs_size,
+ struct iovec *vecs,
+ int *vecs_used)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+ if (status || !ctx->body) {
+ *vecs_used = 0;
+ return status ? status : APR_EOF;
+ }
+
+ status = serf_bucket_read_iovec(ctx->body, requested, vecs_size,
+ vecs, vecs_used);
+ if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+ ctx->state = STATE_TRAILERS;
+ status = APR_SUCCESS;
+ }
+ return status;
+}
+
+static apr_status_t serf_incoming_rq_peek(serf_bucket_t *bucket,
+ const char **data,
+ apr_size_t *len)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+ if (status || !ctx->body) {
+ *len = 0;
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+ else if (APR_STATUS_IS_EOF(status))
+ return SERF_ERROR_TRUNCATED_STREAM;
+
+ return status ? APR_SUCCESS : APR_EOF;
+ }
+
+ status = serf_bucket_peek(ctx->body, data, len);
+ if (APR_STATUS_IS_EOF(status) && ctx->expect_trailers) {
+ ctx->state = STATE_TRAILERS;
+ status = APR_SUCCESS;
+ }
+ return status;
+}
+
+static void serf_incoming_rq_destroy(serf_bucket_t *bucket)
+{
+ incoming_request_context_t *ctx = bucket->data;
+
+ if (ctx->method)
+ serf_bucket_mem_free(bucket->allocator, (void*)ctx->method);
+ if (ctx->path_raw)
+ serf_bucket_mem_free(bucket->allocator, (void*)ctx->path_raw);
+ if (ctx->headers)
+ serf_bucket_destroy(ctx->headers);
+ if (ctx->body)
+ serf_bucket_destroy(ctx->body);
+ else if (ctx->stream)
+ serf_bucket_destroy(ctx->stream);
+
+ serf_default_destroy_and_data(bucket);
+}
+
+apr_status_t serf_bucket_incoming_request_read(
+ serf_bucket_t **headers,
+ const char **method,
+ const char **path,
+ int *http_version,
+ serf_bucket_t *bucket)
+{
+ incoming_request_context_t *ctx = bucket->data;
+ apr_status_t status;
+
+ status = serf_incoming_rq_wait_for(bucket, STATE_BODY);
+ if (status) {
+ if (headers)
+ *headers = NULL;
+ if (method)
+ *method = NULL;
+ if (path)
+ *path = NULL;
+ if (http_version)
+ *http_version = 0;
+
+ return status;
+ }
+
+ if (headers)
+ *headers = ctx->headers;
+ if (method)
+ *method = ctx->method;
+ if (path)
+ *path = ctx->path_raw;
+ if (http_version)
+ *http_version = ctx->version;
+
+ return APR_SUCCESS;
+}
+
+apr_status_t serf_bucket_incoming_request_wait_for_headers(
+ serf_bucket_t *bucket)
+{
+ return serf_incoming_rq_wait_for(bucket, STATE_BODY);
+}
+
+
+const serf_bucket_type_t serf_bucket_type_incoming_request = {
+ "INCOMING-REQUEST",
+ serf_incoming_rq_read,
+ serf_incoming_rq_readline,
+ serf_incoming_rq_read_iovec,
+ serf_default_read_for_sendfile,
+ serf_buckets_are_v2,
+ serf_incoming_rq_peek,
+ serf_incoming_rq_destroy,
+ serf_default_read_bucket,
+ serf_default_get_remaining,
+ serf_default_ignore_config
+};
Modified: serf/trunk/serf_bucket_types.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_bucket_types.h?rev=1714539&r1=1714538&r2=1714539&view=diff
==============================================================================
--- serf/trunk/serf_bucket_types.h (original)
+++ serf/trunk/serf_bucket_types.h Mon Nov 16 10:32:49 2015
@@ -81,6 +81,30 @@ void serf_bucket_request_set_root(
serf_bucket_t *bucket,
const char *root_url);
+
+/* ==================================================================== */
+
+extern const serf_bucket_type_t serf_bucket_type_incoming_request;
+#define SERF_BUCKET_IS_INCOMING_REQUEST(b) \
+ SERF_BUCKET_CHECK((b), incoming_request)
+
+serf_bucket_t *serf_bucket_incoming_request_create(
+ serf_bucket_t *body,
+ serf_bucket_alloc_t *allocator);
+
+/* All output arguments optional. Waits for the request line to have arrived
+ with the normal read responses. */
+/* ### Add RESULT_POOL argument? */
+apr_status_t serf_bucket_incoming_request_read(
+ serf_bucket_t **headers,
+ const char **method,
+ const char **path,
+ int *http_version,
+ serf_bucket_t *request);
+
+apr_status_t serf_bucket_incoming_request_wait_for_headers(
+ serf_bucket_t *response);
+
/* ==================================================================== */