Author: rhuijben
Date: Sat Oct 31 19:53:57 2015
New Revision: 1711679
URL: http://svn.apache.org/viewvc?rev=1711679&view=rev
Log:
Checkpoint a rewrite of the prototype http2 protocol handler to a fully event
driven protocol handler.
This patch starts delegating reading of the headers and response to the stream
handler, which was already responsible for writing out the request.
* protocols/http2_protocol.c
(http2_process): New prototype.
(serf_http2_protocol_t): Store settings. Update state for event driven
handling.
(serf__http2_protocol_init): Update init.
(setup_for_http2): Update caller.
(serf_http2__enqueue_frame): Destroy bucket on error.
(http2_handle_priority,
http2_handle_promise,
http2_handle_frame_reset,
http2_handle_stream_window_update,
http2_handle_connection_window_update,
http2_handle_ping,
http2_handle_ping_ack,
http2_handle_settings,
http2_handle_goaway): New functions handling specific frame data.
(http2_handle_continuation,
http2_end_of_frame,
http2_bucket_processor): New callbacks for specific bucket callbacks.
(http2_read): Rename to...
(http2_process): ... this, and implements the logic in a completely different
way.
(http2_protocol_read): Update caller.
(serf_http2__allocate_stream_id): Tweak for variable name updates.
(serf_http2__stream_get): Resolve endless loop. Tweak varnames.
(serf_http2__enqueue_stream_reset): New stub function.
* protocols/http2_protocol.h
(SERF_H2_assert): New define.
(HTTP2_DEFAULT_MAX_CONCURRENT,
HTTP2_PRIORITY_DATA_SIZE,
HTTP2_RST_DATA_SIZE,
HTTP2_PROMISE_DATA_SIZE,
HTTP2_PING_DATA_SIZE,
HTTP2_GOWAWAY_DATA_SIZE,
HTTP2_WINDOW_UPDATE_DATA_SIZE,
HTTP2_SETTING_SIZE): New defines for magic values.
(serf_http2_stream_data_t): New typedef.
(serf_http2_stream_t): Hide some data inside serf_http2_stream_data_t.
(serf_http2_processor_t): New typedef.
(serf_http2__enqueue_stream_reset): New function.
(serf_http2__stream_get_by_id): Rename to...
(serf_http2__stream_get): ... this to match implementation.
(serf_http2__stream_reset,
serf_http2__stream_handle_hpack,
serf_http2__stream_handle_data,
serf_http2__stream_processor): New functions.
* protocols/http2_stream.c
(serf_http2_stream_data_t): New struct.
(serf_http2__stream_create): Init data struct.
(serf_http2__stream_cleanup): Cleanup data struct.
(serf_http2__stream_setup_request): Update caller.
(serf_http2__stream_reset,
stream_response_eof,
serf_http2__stream_handle_hpack,
serf_http2__stream_handle_data,
serf_http2__stream_processor): New function.
Modified:
serf/trunk/protocols/http2_protocol.c
serf/trunk/protocols/http2_protocol.h
serf/trunk/protocols/http2_stream.c
Modified: serf/trunk/protocols/http2_protocol.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1711679&r1=1711678&r2=1711679&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Sat Oct 31 19:53:57 2015
@@ -43,6 +43,9 @@ http2_protocol_hangup(serf_connection_t
static void
http2_protocol_teardown(serf_connection_t *conn);
+static apr_status_t
+http2_process(serf_http2_protocol_t *h2);
+
static serf_bucket_t *
serf_bucket_create_numberv(serf_bucket_alloc_t *allocator, const char *format,
...)
{
@@ -132,26 +135,42 @@ struct serf_http2_protocol_t
apr_pool_t *pool;
serf_connection_t *conn;
serf_bucket_t *ostream;
+ serf_bucket_alloc_t *allocator;
- serf_hpack_table_t *hpack_tbl;
+ serf_http2_processor_t processor;
+ void *processor_baton;
+ serf_bucket_t *read_frame; /* Frame currently being read */
+ int in_frame;
- apr_uint32_t default_lr_window;
- apr_uint32_t default_rl_window;
+ serf_hpack_table_t *hpack_tbl;
+ serf_config_t *config;
- apr_int64_t lr_window; /* local->remote */
- apr_int64_t rl_window; /* remote->local */
- apr_int32_t next_local_streamid;
- apr_int32_t next_remote_streamid;
+ /* Local -> Remote. Settings provided by other side */
+ apr_uint32_t lr_default_window;
+ apr_uint32_t lr_window;
+ apr_uint32_t lr_max_framesize;
+ apr_uint32_t lr_max_headersize;
+ apr_uint32_t lr_max_concurrent;
+ apr_int32_t lr_next_streamid;
+ char lr_push_enabled;
+
+ /* Remote -> Local. Settings set by us. Acknowledged by other side */
+ apr_uint32_t rl_default_window;
+ apr_uint32_t rl_window;
+ apr_uint32_t rl_max_framesize;
+ apr_uint32_t rl_max_headersize;
+ apr_uint32_t rl_max_concurrent;
+ apr_int32_t rl_next_streamid;
+ char rl_push_enabled;
serf_http2_stream_t *first;
serf_http2_stream_t *last;
- char buffer[HTTP2_DEFAULT_MAX_FRAMESIZE];
- apr_size_t buffer_used;
- serf_bucket_t *cur_frame;
- serf_bucket_t *cur_payload;
- int in_payload;
+ int setting_acks;
+ int enforce_flow_control;
+ serf_bucket_t *continuation_bucket;
+ apr_int32_t continuation_streamid;
};
static apr_status_t
@@ -179,6 +198,7 @@ void serf__http2_protocol_init(serf_conn
serf_http2_protocol_t *ctx;
apr_pool_t *protocol_pool;
serf_bucket_t *tmp;
+ const int WE_ARE_CLIENT = 1;
apr_pool_create(&protocol_pool, conn->pool);
@@ -186,15 +206,30 @@ void serf__http2_protocol_init(serf_conn
ctx->pool = protocol_pool;
ctx->conn = conn;
ctx->ostream = conn->ostream_tail;
+ ctx->allocator = conn->allocator;
+ ctx->config = conn->config;
/* Defaults until negotiated */
- ctx->default_lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
- ctx->default_rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
-
- ctx->lr_window = ctx->default_lr_window;
- ctx->rl_window = ctx->default_rl_window;
- ctx->next_local_streamid = 1; /* 2 if we would be the server */
- ctx->next_remote_streamid = 2; /* 1 if we would be the client */
+ ctx->rl_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
+ ctx->rl_window = HTTP2_DEFAULT_WINDOW_SIZE;
+ ctx->rl_next_streamid = WE_ARE_CLIENT ? 2 : 1;
+ ctx->rl_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
+ ctx->rl_max_headersize = APR_UINT32_MAX;
+ ctx->rl_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
+ ctx->rl_push_enabled = TRUE;
+
+ ctx->lr_default_window = HTTP2_DEFAULT_WINDOW_SIZE;
+ ctx->lr_window = HTTP2_DEFAULT_WINDOW_SIZE;
+ ctx->lr_next_streamid = WE_ARE_CLIENT ? 1 : 2;
+ ctx->lr_max_framesize = HTTP2_DEFAULT_MAX_FRAMESIZE;
+ ctx->lr_max_headersize = APR_UINT32_MAX;
+ ctx->lr_max_concurrent = HTTP2_DEFAULT_MAX_CONCURRENT;
+ ctx->lr_push_enabled = TRUE;
+
+ ctx->setting_acks = 0;
+ ctx->enforce_flow_control = TRUE;
+ ctx->continuation_bucket = NULL;
+ ctx->continuation_streamid = 0;
ctx->first = ctx->last = NULL;
@@ -253,9 +288,9 @@ setup_for_http2(serf_http2_protocol_t *h
serf_http2_stream_t *stream;
stream = serf_http2__stream_create(h2, -1,
- h2->default_lr_window,
- h2->default_rl_window,
- h2->conn->allocator);
+ h2->lr_default_window,
+ h2->rl_default_window,
+ h2->allocator);
if (h2->first)
{
@@ -287,7 +322,10 @@ serf_http2__enqueue_frame(serf_http2_pro
status = serf_bucket_peek(h2->ostream, &data, &len);
if (SERF_BUCKET_READ_ERROR(status))
- return status;
+ {
+ serf_bucket_destroy(frame);
+ return status;
+ }
if (len == 0)
{
@@ -322,178 +360,871 @@ serf_http2__enqueue_frame(serf_http2_pro
return APR_SUCCESS;
}
+/* Implements serf_bucket_prefix_handler_t.
+ Handles PRIORITY frames and the priority prefix of HEADERS frames */
+static apr_status_t
+http2_handle_priority(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_stream_t *stream = baton;
+
+ if (len != HTTP2_PRIORITY_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+ if (stream == NULL)
+ return APR_SUCCESS; /* Nothing to record this on */
+ /* ### TODO: Store priority information on stream */
+ SERF_H2_assert(stream->h2 != NULL);
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+ Handles the promise prefix of PUSH_PROMISE frames */
static apr_status_t
-http2_read(serf_connection_t *conn)
+http2_handle_promise(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
{
- serf_http2_protocol_t *ctx = conn->protocol_baton;
- apr_status_t status = APR_SUCCESS;
+ serf_http2_stream_t *stream = baton;
- while (TRUE)
- {
- status = APR_SUCCESS;
+ if (len != HTTP2_PROMISE_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
- if (ctx->cur_frame)
- {
- const char *data;
- apr_size_t len;
+ /* ### TODO: Prepare reading promise on stream */
+ SERF_H2_assert(stream->h2 != NULL);
- if (! ctx->in_payload)
- {
- unsigned char flags;
- unsigned char frametype;
- apr_int32_t streamid;
- apr_uint64_t size;
-
- status = serf__bucket_http2_unframe_read_info(ctx->cur_frame,
- &streamid,
&frametype,
- &flags);
+ return APR_SUCCESS;
+}
- if (status && !APR_STATUS_IS_EOF(status))
- break;
+/* Implements serf_bucket_prefix_handler_t.
+ Handles the promise prefix of FRAME_RSET frames */
+static apr_status_t
+http2_handle_frame_reset(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_stream_t *stream = baton;
- size = serf_bucket_get_remaining(ctx->cur_frame);
+ if (len != HTTP2_RST_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
- serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
- "Start 0x%02x http2 frame on stream 0x%x, flags=0x%x,
size=0x%x\n",
- (int)frametype, (int)streamid, (int)flags, (int)size);
-
- ctx->in_payload = TRUE;
-
- if (flags & HTTP2_FLAG_PADDED)
- {
- ctx->cur_payload =
- serf__bucket_http2_unpad_create(
- ctx->cur_frame, TRUE,
- ctx->cur_frame->allocator);
- }
- else
- ctx->cur_payload = ctx->cur_frame;
+ SERF_H2_assert(stream->h2 != NULL);
- if (frametype == HTTP2_FRAME_TYPE_HEADERS)
- {
- ctx->cur_payload = serf__bucket_hpack_decode_create(
- ctx->cur_payload,
- NULL, NULL,
- 16384, ctx->hpack_tbl,
- ctx->cur_frame->allocator);
- }
- }
+ /* ### TODO: Handle error code, etc. */
+ stream->status = H2S_CLOSED;
- status = serf_bucket_read(ctx->cur_payload,
- sizeof(ctx->buffer) - ctx->buffer_used,
- &data, &len);
+ return APR_SUCCESS;
+}
- if (SERF_BUCKET_READ_ERROR(status))
- break;
+/* Implements serf_bucket_prefix_handler_t.
+ Handles WINDOW_UPDATE frames when they apply to a stream */
+static apr_status_t
+http2_handle_stream_window_update(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_stream_t *stream = baton;
- if (len)
- {
- memcpy(&ctx->buffer[ctx->buffer_used], data, len);
- ctx->buffer_used += len;
- }
+ if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
- if (APR_STATUS_IS_EOF(status))
- {
- apr_int32_t streamid;
- unsigned char frametype;
- unsigned char flags;
-
- serf__bucket_http2_unframe_read_info(ctx->cur_frame,
- &streamid, &frametype,
- &flags);
- serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
- "Done 0x%02x http2 frame on stream 0x%x, flags=0x%x,
size=0x%x\n",
- (int)frametype, (int)streamid, (int)flags,
(int)ctx->buffer_used);
-
- if (frametype == HTTP2_FRAME_TYPE_DATA
- || frametype == HTTP2_FRAME_TYPE_HEADERS)
- {
- /* Ugly hack to dump body. Memory LEAK! */
- serf__log(LOGLVL_INFO, SERF_LOGHTTP2, conn->config,
- "%s\n", apr_pstrmemdup(conn->pool, ctx->buffer,
ctx->buffer_used));
- }
-
- if (frametype == HTTP2_FRAME_TYPE_GOAWAY && conn)
- serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config,
- "Go away reason %d: %s\n", ctx->buffer[7],
- apr_pstrmemdup(conn->pool,
- &ctx->buffer[8],
-
(ctx->buffer_used >= 8)
- ?
ctx->buffer_used-8 : 0));
-
- if (frametype == HTTP2_FRAME_TYPE_RST_STREAM && conn)
- serf__log(LOGLVL_WARNING, SERF_LOGHTTP2, conn->config,
- "Reset reason %d: %s\n", ctx->buffer[7],
- apr_pstrmemdup(conn->pool,
- &ctx->buffer[8],
- (ctx->buffer_used >= 8)
- ? ctx->buffer_used - 8 : 0));
-
- if (frametype == HTTP2_FRAME_TYPE_SETTINGS
- && !(flags & HTTP2_FLAG_ACK))
- {
- /* Always ack settings */
- serf_http2__enqueue_frame(
- ctx,
+ SERF_H2_assert(stream->h2 != NULL);
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+ Handles WINDOW_UPDATE frames when they apply to the connection */
+static apr_status_t
+http2_handle_connection_window_update(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_protocol_t *h2 = baton;
+
+ if (len != HTTP2_WINDOW_UPDATE_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+ SERF_H2_assert(h2 != NULL);
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+ Handles PING frames for pings initiated remotely */
+static apr_status_t
+http2_handle_ping(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_protocol_t *h2 = baton;
+ serf_bucket_t *body;
+ apr_status_t status;
+
+ if (len != HTTP2_PING_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+ SERF_H2_assert(h2 != NULL);
+
+ /* Reply with a PONG (=PING + ACK) with the same data*/
+
+ body = serf_bucket_simple_copy_create(data, len,
+ h2->allocator);
+
+ status = serf_http2__enqueue_frame(
+ h2,
+ serf__bucket_http2_frame_create(body,
+ HTTP2_FRAME_TYPE_PING,
+ HTTP2_FLAG_ACK,
+ NULL, NULL, NULL,
+ h2->lr_max_framesize,
+ NULL, NULL,
+ h2->allocator),
+ TRUE /* pump */);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+ Handles PING frames for pings initiated locally */
+static apr_status_t
+http2_handle_ping_ack(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_protocol_t *h2 = baton;
+ if (len != HTTP2_PING_DATA_SIZE)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+ SERF_H2_assert(h2 != NULL);
+
+ /* Did we send a ping? */
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+ Handles SETTINGS frames */
+static apr_status_t
+http2_handle_settings(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_protocol_t *h2 = baton;
+ apr_size_t i;
+ const struct setting_t
+ {
+ unsigned char s1, s0;
+ unsigned char v3, v2, v1, v0;
+ } *setting;
+
+ if ((len % HTTP2_SETTING_SIZE) != 0)
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+
+ /* ### TODO: Handle settings */
+ setting = (const void *)data;
+ for (i = 0, setting = (const void *)data;
+ i < len;
+ i += sizeof(*setting), setting++)
+ {
+ apr_uint16_t id = (setting->s1 << 8) | setting->s0;
+ apr_uint32_t value = (setting->v3 << 24) | (setting->v2 << 16)
+ | (setting->v1 << 8) | setting->v0;
+
+ switch (id)
+ {
+ case HTTP2_SETTING_HEADER_TABLE_SIZE:
+ /* TODO: Send to hpack table */
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Setting HPACK Table size to %u\n", value);
+ break;
+ case HTTP2_SETTING_ENABLE_PUSH:
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Setting Push enabled: %u\n", value);
+ h2->lr_push_enabled = (value != 0);
+ break;
+ case HTTP2_SETTING_MAX_CONCURRENT_STREAMS:
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Setting Max Concurrent %u\n", value);
+ h2->lr_max_concurrent = value;
+ break;
+ case HTTP2_SETTING_INITIAL_WINDOW_SIZE:
+ /* Sanitize? */
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Setting Initial Window Size %u\n", value);
+ h2->lr_default_window = value;
+ break;
+ case HTTP2_SETTING_MAX_FRAME_SIZE:
+ /* Sanitize? */
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Setting Max framesize %u\n", value);
+ h2->lr_max_framesize = value;
+ break;
+ case HTTP2_SETTING_MAX_HEADER_LIST_SIZE:
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Setting Max header list size %u\n", value);
+ h2->lr_max_headersize = value;
+ break;
+ default:
+ /* An endpoint that receives a SETTINGS frame with any unknown
+ or unsupported identifier MUST ignore that setting. */
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Ignoring unknown setting %d, value %u\n", id, value);
+ break;
+ }
+ }
+
+ /* Always ack settings */
+ serf_http2__enqueue_frame(
+ h2,
serf__bucket_http2_frame_create(
NULL,
HTTP2_FRAME_TYPE_SETTINGS,
HTTP2_FLAG_ACK,
NULL, NULL, NULL,
HTTP2_DEFAULT_MAX_FRAMESIZE,
- NULL, NULL, conn->allocator),
- TRUE);
- }
- else if (frametype == HTTP2_FRAME_TYPE_DATA)
- {
- /* Provide a bit of window space to the server after
- receiving data */
- serf_http2__enqueue_frame(
- ctx,
- serf__bucket_http2_frame_create(
- serf_bucket_create_numberv(conn->allocator, "4",
(apr_int32_t)16384),
- HTTP2_FRAME_TYPE_WINDOW_UPDATE, 0,
- &streamid, NULL, NULL,
- HTTP2_DEFAULT_MAX_FRAMESIZE,
- NULL, NULL, conn->allocator),
+ NULL, NULL, h2->allocator),
TRUE);
- }
- else if (frametype == HTTP2_FRAME_TYPE_PING)
- {
- /* TODO: PONG (=Ping Ack) */
- }
-
- serf_bucket_destroy(ctx->cur_payload);
- ctx->cur_frame = ctx->cur_payload = NULL;
- ctx->in_payload = FALSE;
- ctx->buffer_used = 0;
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_bucket_prefix_handler_t.
+ Handles GOAWAY frames */
+static apr_status_t
+http2_handle_goaway(void *baton,
+ serf_bucket_t *bucket,
+ const char *data,
+ apr_size_t len)
+{
+ serf_http2_protocol_t *h2 = baton;
+
+ SERF_H2_assert(h2 != NULL);
+
+ return APR_SUCCESS;
+}
+
+
+/* Implements serf_bucket_aggregate_eof_t */
+static apr_status_t
+http2_handle_continuation(void *baton,
+ serf_bucket_t *aggregate_bucket)
+{
+ serf_http2_protocol_t *h2 = baton;
+ apr_status_t status;
+ const char *data;
+ apr_size_t len;
+
+ if (h2->continuation_bucket != aggregate_bucket)
+ return APR_EOF; /* This is all we have */
+
+ SERF_H2_assert(h2->read_frame == NULL);
+ SERF_H2_assert(h2->continuation_bucket == aggregate_bucket);
+
+ status = http2_process(h2);
+ if (status)
+ return status;
+
+ if (h2->continuation_bucket == aggregate_bucket)
+ {
+ /* We expect more data in the future. Something
+ was done in http2_process() or it didn't
+ return APR_SUCCESS */
+ return APR_SUCCESS;
+ }
+
+ /* As h2->continuation_bucket is no longer attached we don't
+ recurse on peeking. Just check if there is more */
+ return serf_bucket_peek(aggregate_bucket, &data, &len);
+}
+
+/* Implements the serf__bucket_http2_unframe_set_eof callback */
+static apr_status_t
+http2_end_of_frame(void *baton,
+ serf_bucket_t *frame)
+{
+ serf_http2_protocol_t *h2 = baton;
+
+ SERF_H2_assert(h2->read_frame == frame);
+ h2->read_frame = NULL;
+ h2->in_frame = FALSE;
+ h2->processor = NULL;
+ h2->processor_baton = NULL;
+
+ return APR_SUCCESS;
+}
+
+/* Implements serf_http2_processor_t */
+static apr_status_t
+http2_bucket_processor(void *baton,
+ serf_http2_protocol_t *h2,
+ serf_bucket_t *frame_bucket)
+{
+ struct iovec vecs[16];
+ int vecs_used;
+ serf_bucket_t *payload = baton;
+ apr_status_t status;
+
+ status = serf_bucket_read_iovec(payload, SERF_READ_ALL_AVAIL, 16,
+ vecs, &vecs_used);
+
+ if (APR_STATUS_IS_EOF(status))
+ {
+ SERF_H2_assert(!h2->in_frame && !h2->read_frame);
+ serf_bucket_destroy(payload);
+ }
+
+ return status;
+}
+
+/* Processes incoming HTTP2 data */
+static apr_status_t
+http2_process(serf_http2_protocol_t *h2)
+{
+ while (TRUE)
+ {
+ apr_status_t status;
+ serf_bucket_t *body;
+
+ if (h2->processor)
+ {
+ status = h2->processor(h2->processor_baton, h2, h2->read_frame);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+ else if (APR_STATUS_IS_EOF(status))
+ {
+ /* ### frame ended */
+ SERF_H2_assert(h2->read_frame == NULL);
+ h2->processor = NULL;
+ h2->processor_baton = NULL;
}
- else
- continue;
+ else if (h2->in_frame)
+ {
+ if (status)
+ return status;
+ else
+ continue;
+ }
+ }
+ else
+ {
+ SERF_H2_assert(!h2->in_frame);
}
- if (APR_STATUS_IS_EOF(status))
+ body = h2->read_frame;
+
+ if (! body)
{
- const char *data;
- apr_size_t len;
- status = serf_bucket_peek(conn->stream, &data, &len);
+ SERF_H2_assert(!h2->in_frame);
+
+ body = serf__bucket_http2_unframe_create(
+ h2->conn->stream, FALSE,
+ h2->rl_max_framesize,
+ h2->allocator);
+
+ serf__bucket_http2_unframe_set_eof(body,
+ http2_end_of_frame, h2);
- if (SERF_BUCKET_READ_ERROR(status)
- || APR_STATUS_IS_EOF(status))
+ serf_bucket_set_config(body, h2->config);
+ h2->read_frame = body;
+ }
+
+ if (! h2->in_frame)
+ {
+ apr_int32_t sid;
+ unsigned char frametype;
+ unsigned char frameflags;
+ apr_size_t remaining;
+ serf_http2_processor_t process_handler = NULL;
+ void *process_baton = NULL;
+ serf_bucket_t *process_bucket = NULL;
+ serf_http2_stream_t *stream;
+ apr_uint32_t reset_reason;
+
+ status = serf__bucket_http2_unframe_read_info(body, &sid,
+ &frametype,
&frameflags);
+
+ if (APR_STATUS_IS_EOF(status))
{
- /* We have a real EOF*/
- break;
+ /* Entire frame is already read (just header) */
+ SERF_H2_assert(h2->read_frame == NULL);
+ SERF_H2_assert(! h2->in_frame);
+ }
+ else if (status)
+ {
+ SERF_H2_assert(h2->read_frame != NULL);
+ SERF_H2_assert(! h2->in_frame);
+ return status;
+ }
+ else
+ {
+ h2->in_frame = TRUE;
+ SERF_H2_assert(h2->read_frame != NULL);
}
- }
- ctx->cur_frame = ctx->cur_payload =
- serf__bucket_http2_unframe_create(conn->stream, FALSE,
- HTTP2_DEFAULT_MAX_FRAMESIZE,
- conn->stream->allocator);
- }
+ serf__log(LOGLVL_INFO, SERF_LOGHTTP2, h2->config,
+ "Reading 0x%x frame, stream=%x, flags=0x%x\n",
+ frametype, sid, frameflags);
- return status;
+ /* If status is EOF then the frame doesn't have/declare a body */
+ switch (frametype)
+ {
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_DATA:
+ case HTTP2_FRAME_TYPE_HEADERS:
+ case HTTP2_FRAME_TYPE_PUSH_PROMISE:
+ if (h2->continuation_bucket)
+ {
+ h2->continuation_bucket = NULL;
+ h2->continuation_streamid = 0;
+ return APR_EAGAIN;
+ }
+
+ stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+ if (sid == 0)
+ {
+ /* DATA, HEADERS and PUSH_PROMISE:
+
+ These frames MUST be associated with a stream. If a
+ XXX frame is received whose stream identifier field is
0x0,
+ the recipient MUST respond with a connection error
+ (Section 5.4.1) of type PROTOCOL_ERROR. */
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+
+ reset_reason = 0;
+
+ if (frametype == HTTP2_FRAME_TYPE_DATA)
+ {
+ remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+ if (h2->rl_window < remaining)
+ {
+ if (h2->enforce_flow_control)
+ reset_reason = SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
+
+ h2->rl_window = 0;
+ }
+ else
+ h2->rl_window -= remaining;
+
+ if (stream)
+ {
+ if (stream->rl_window < remaining)
+ {
+ if (h2->enforce_flow_control)
+ reset_reason =
SERF_ERROR_HTTP2_FLOW_CONTROL_ERROR;
+
+ stream->rl_window = 0;
+ }
+ else
+ stream->rl_window -= remaining;
+ }
+ }
+
+ /* DATA, HEADERS and PUSH_PROMISE can have padding */
+ if (frameflags & HTTP2_FLAG_PADDED)
+ {
+ body = serf__bucket_http2_unpad_create(body, TRUE,
+ h2->allocator);
+ }
+
+ /* An HEADERS frame can have an included priority 'frame' */
+ if (frametype == HTTP2_FRAME_TYPE_HEADERS
+ && (frameflags & HTTP2_FLAG_PRIORITY))
+ {
+ body = serf_bucket_prefix_create(body,
+ HTTP2_PRIORITY_DATA_SIZE,
+ http2_handle_priority,
+ stream, h2->allocator);
+ }
+
+ if (frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE)
+ {
+ body = serf_bucket_prefix_create(body,
+ HTTP2_PROMISE_DATA_SIZE,
+ http2_handle_promise,
+ stream, h2->allocator);
+ }
+
+ if (!stream)
+ {
+ if (!reset_reason)
+ reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+ }
+ else
+ switch (frametype)
+ {
+ case HTTP2_FRAME_TYPE_DATA:
+ if (stream->status != H2S_OPEN
+ && stream->status != H2S_HALFCLOSED_LOCAL)
+ {
+ reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+ }
+ break;
+ case HTTP2_FRAME_TYPE_HEADERS:
+ if (stream->status != H2S_IDLE
+ && stream->status != H2S_RESERVED_LOCAL
+ && stream->status != H2S_OPEN
+ && stream->status != H2S_HALFCLOSED_REMOTE)
+ {
+ reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+ }
+ break;
+ case HTTP2_FRAME_TYPE_PUSH_PROMISE:
+ if (stream->status != H2S_OPEN
+ && stream->status != H2S_HALFCLOSED_REMOTE)
+ {
+ reset_reason = SERF_ERROR_HTTP2_STREAM_CLOSED;
+ }
+ break;
+ }
+
+ if (reset_reason)
+ {
+ if (stream)
+ serf_http2__stream_reset(stream, reset_reason, TRUE);
+ else
+ serf_http2__enqueue_stream_reset(h2, sid, reset_reason);
+ }
+
+ if (frametype == HTTP2_FRAME_TYPE_HEADERS
+ || frametype == HTTP2_FRAME_TYPE_PUSH_PROMISE)
+ {
+ if (!(frameflags & HTTP2_FLAG_END_HEADERS))
+ {
+ /* This header frame is *directly* followed by
+ continuation frames... We hide this from the
+ stream code, by providing an aggregate that will
+ read through the body of multiple frames */
+
+ h2->continuation_bucket = serf_bucket_aggregate_create(
+ h2->allocator);
+ h2->continuation_streamid = sid;
+
+ serf_bucket_aggregate_append(h2->continuation_bucket,
+ body);
+
+ serf_bucket_aggregate_hold_open(
+ h2->continuation_bucket,
+ http2_handle_continuation, h2);
+ }
+
+ if (stream && !reset_reason)
+ {
+ body = serf_http2__stream_handle_hpack(
+ stream, body, frametype,
+ (frameflags & HTTP2_FLAG_END_STREAM),
+ h2->rl_max_headersize,
+ h2->hpack_tbl, h2->config,
+ h2->allocator);
+ }
+ else
+ {
+ /* Even when we don't want to process the headers we
+ must read them to update the HPACK state */
+ body = serf__bucket_hpack_decode_create(
+ body, NULL, NULL,
h2->rl_max_headersize,
+ h2->hpack_tbl, h2->allocator);
+ }
+ }
+ else /* We have a data bucket */
+ {
+ body = serf_http2__stream_handle_data(
+ stream, body, frametype,
+ (frameflags & HTTP2_FLAG_END_STREAM),
+ h2->config, h2->allocator);
+ }
+
+ if (body)
+ process_bucket = body; /* We will take care of discarding */
+ else
+ {
+ /* The stream wants to handle the reading itself */
+ process_handler = serf_http2__stream_processor;
+ process_baton = stream;
+ }
+ break;
+
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_PRIORITY:
+ if (sid == 0)
+ {
+ /* The PRIORITY frame always identifies a stream. If a
+ PRIORITY frame is received with a stream identifier of
+ 0x0, the recipient MUST respond with a connection error
+ (Section 5.4.1) of type PROTOCOL_ERROR.*/
+
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+ else if (serf_bucket_get_remaining(body)
+ != HTTP2_PRIORITY_DATA_SIZE)
+ {
+ /* A PRIORITY frame with a length other than 5 octets MUST
+ be treated as a stream error (Section 5.4.2) of type
+ FRAME_SIZE_ERROR.*/
+
+ /* ### But we currently upgrade this to a connection error
*/
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+ }
+
+ stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+ if (stream)
+ {
+ body = serf_bucket_prefix_create(body,
+ HTTP2_PRIORITY_DATA_SIZE,
+ http2_handle_priority,
+ stream, h2->allocator);
+ }
+
+ /* Just reading will do the right thing now */
+ process_bucket = body;
+ break;
+
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_RST_STREAM:
+ if (sid == 0)
+ {
+ /* RST_STREAM frames MUST be associated with a stream.
+ If a RST_STREAM frame is received with a stream
+ identifier of 0x0, the recipient MUST treat this as a
+ connection error (Section 5.4.1) of type PROTOCOL_ERROR.
+ */
+
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+ else if (serf_bucket_get_remaining(body)
+ != HTTP2_RST_DATA_SIZE)
+ {
+ /* A RST_STREAM frame with a length other than 4 octets
MUST
+ be treated as a connection error (Section 5.4.1) of type
+ FRAME_SIZE_ERROR. */
+
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+ }
+
+ stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+ if (stream)
+ {
+ body = serf_bucket_prefix_create(body,
+
HTTP2_FRAME_TYPE_RST_STREAM,
+ http2_handle_frame_reset,
+ stream, h2->allocator);
+ }
+
+ /* Just reading will do the right thing now */
+ process_bucket = body;
+ break;
+
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_SETTINGS:
+ if (sid != 0)
+ {
+ /* SETTINGS frames always apply to a connection, never a
+ single stream. The stream identifier for a SETTINGS
+ frame MUST be zero (0x0). If an endpoint receives a
+ SETTINGS frame whose stream identifier field is
+ anything other than 0x0, the endpoint MUST respond
+ with a connection error (Section 5.4.1) of type
+ PROTOCOL_ERROR.
+ */
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+
+ remaining = (apr_size_t)serf_bucket_get_remaining(body);
+ if (frameflags & HTTP2_FLAG_ACK)
+ {
+ if (remaining != 0)
+ {
+ /* When this bit is set, the payload of the SETTINGS
+ frame MUST be empty. Receipt of a SETTINGS frame
+ with the ACK flag set and a length field value
+ other than 0 MUST be treated as a connection error
+ (Section 5.4.1) of type FRAME_SIZE_ERROR. */
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+ }
+ h2->setting_acks++;
+ }
+ else if ((remaining % HTTP2_SETTING_SIZE) != 0)
+ {
+ /* A SETTINGS frame with a length other than a multiple of
+ 6 octets MUST be treated as a connection error (Section
+ 5.4.1) of type FRAME_SIZE_ERROR. */
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+ }
+ else
+ {
+ /* Just read everything... We checked it against our
+ max-framesize */
+ body = serf_bucket_prefix_create(body, remaining,
+ http2_handle_settings, h2,
+ h2->allocator);
+ }
+
+ /* Just reading will do the right thing now */
+ process_bucket = body;
+ break;
+
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_PING:
+ if (sid != 0)
+ {
+ /* PING frames are not associated with any individual
+ stream. If a PING frame is received with a stream
+ identifier field value other than 0x0, the recipient
+ MUST respond with a connection error (Section 5.4.1)
+ of type PROTOCOL_ERROR.*/
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+ else if (serf_bucket_get_remaining(body)
+ != HTTP2_PING_DATA_SIZE)
+ {
+ /* Receipt of a PING frame with a length field value other
+ than 8 MUST be treated as a connection error (Section
+ 5.4.1) of type FRAME_SIZE_ERROR.. */
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+ }
+
+ body = serf_bucket_prefix_create(body, HTTP2_PING_DATA_SIZE,
+ (frameflags & HTTP2_FLAG_ACK)
+ ? http2_handle_ping
+ : http2_handle_ping_ack,
+ h2, h2->allocator);
+
+ /* Just reading will do the right thing now */
+ process_bucket = body;
+ break;
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_GOAWAY:
+ if (sid != 0)
+ {
+ /* The GOAWAY frame applies to the connection, not a
+ specific stream. An endpoint MUST treat a GOAWAY frame
+ with a stream identifier other than 0x0 as a connection
+ error(Section 5.4.1) of type PROTOCOL_ERROR. */
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+
+ /* As the final go-away frame is best effort only we are not
+ checking the bodysize against HTTP2_GOWAWAY_DATA_SIZE here.
+ We'll see what we get in the goaway handler.
+
+ Go away frames may contain additional opaque debug
+ information at the end, so instead of reading
+ HTTP2_GOWAWAY_DATA_SIZE bytes, we just read the whole frame.
+ */
+ remaining = (apr_size_t)serf_bucket_get_remaining(body);
+
+ body = serf_bucket_prefix_create(body, remaining,
+ http2_handle_goaway, h2,
+ h2->allocator);
+
+ /* Just reading will do the right thing now */
+ process_bucket = body;
+ break;
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_WINDOW_UPDATE:
+ if (serf_bucket_get_remaining(body)
+ != HTTP2_WINDOW_UPDATE_DATA_SIZE)
+ {
+ /* A WINDOW_UPDATE frame with a length other than 4 octets
+ MUST be treated as a connection error (Section 5.4.1)
+ of type FRAME_SIZE_ERROR. */
+ return SERF_ERROR_HTTP2_FRAME_SIZE_ERROR;
+ }
+
+ if (sid == 0)
+ {
+ body = serf_bucket_prefix_create(
+ body,
+ HTTP2_WINDOW_UPDATE_DATA_SIZE,
+ http2_handle_connection_window_update, h2,
+ h2->allocator);
+ }
+ else
+ {
+ stream = serf_http2__stream_get(h2, sid, TRUE, TRUE);
+
+ if (stream)
+ body = serf_bucket_prefix_create(
+ body,
+ HTTP2_WINDOW_UPDATE_DATA_SIZE,
+ http2_handle_stream_window_update, stream,
+ h2->allocator);
+ }
+
+ /* Just reading will do the right thing now */
+ process_bucket = body;
+ break;
+
+ /* ---------------------------------------------------- */
+ case HTTP2_FRAME_TYPE_CONTINUATION:
+ if (!h2->continuation_bucket
+ || sid != h2->continuation_streamid)
+ {
+ /* A CONTINUATION frame MUST be preceded by a HEADERS,
+ PUSH_PROMISE or CONTINUATION frame without the
+ END_HEADERS flag set. A recipient that observes
+ violation of this rule MUST respond with a connection
+ error(Section 5.4.1) of type PROTOCOL_ERROR. */
+ h2->continuation_bucket = NULL;
+ h2->continuation_streamid = 0;
+ return SERF_ERROR_HTTP2_PROTOCOL_ERROR;
+ }
+
+ serf_bucket_aggregate_append(h2->continuation_bucket, body);
+
+ if (frameflags & HTTP2_FLAG_END_HEADERS)
+ {
+ h2->continuation_bucket = NULL;
+ h2->continuation_streamid = 0;
+ }
+
+ return APR_SUCCESS;
+
+ /* ---------------------------------------------------- */
+ default:
+ /* We explicitly ignore all other frames as required,
+ so reading will do the right thing now */
+ process_bucket = body;
+ } /* switch */
+
+ if (body)
+ serf_bucket_set_config(body, h2->config);
+
+ SERF_H2_assert(h2->processor == NULL);
+
+ if (process_handler)
+ {
+ h2->processor = process_handler;
+ h2->processor_baton = process_baton;
+ }
+ else
+ {
+ SERF_H2_assert(process_bucket != NULL);
+ h2->processor = http2_bucket_processor;
+ h2->processor_baton = process_bucket;
+ }
+ }
+ } /* while(TRUE) */
}
static apr_status_t
@@ -510,7 +1241,7 @@ http2_protocol_read(serf_connection_t *c
conn->ctx->dirty_pollset = 1;
}
- status = http2_read(conn);
+ status = http2_process(conn->protocol_baton);
if (!status)
return APR_SUCCESS;
@@ -601,8 +1332,8 @@ serf_http2__allocate_stream_id(void *bat
*/
if (stream->streamid < 0)
{
- stream->streamid = stream->h2->next_local_streamid;
- stream->h2->next_local_streamid += 2;
+ stream->streamid = stream->h2->lr_next_streamid;
+ stream->h2->lr_next_streamid += 2;
if (stream->status == H2S_INIT)
stream->status = H2S_IDLE;
@@ -628,7 +1359,7 @@ serf_http2__stream_get(serf_http2_protoc
if (streamid < 0)
return NULL;
- for (stream = h2->first; stream; stream->next)
+ for (stream = h2->first; stream; stream = stream->next)
{
if (stream->streamid == streamid)
{
@@ -640,13 +1371,13 @@ serf_http2__stream_get(serf_http2_protoc
}
if (create_for_remote
- && (streamid & 0x01) == (h2->next_remote_streamid & 0x01))
+ && (streamid & 0x01) == (h2->rl_next_streamid & 0x01))
{
serf_http2_stream_t *rs;
stream = serf_http2__stream_create(h2, streamid,
- h2->default_lr_window,
- h2->default_rl_window,
- h2->conn->allocator);
+ h2->lr_default_window,
+ h2->rl_default_window,
+ h2->allocator);
if (h2->first)
{
@@ -657,10 +1388,10 @@ serf_http2__stream_get(serf_http2_protoc
else
h2->last = h2->first = stream;
- if (streamid < h2->next_remote_streamid)
+ if (streamid < h2->rl_next_streamid)
stream->status = H2S_CLOSED;
else
- h2->next_remote_streamid = (streamid + 2);
+ h2->rl_next_streamid = (streamid + 2);
for (rs = h2->first; rs; rs = rs->next)
{
@@ -681,3 +1412,11 @@ serf_http2__stream_get(serf_http2_protoc
}
return NULL;
}
+
+apr_status_t
+serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2,
+ apr_int32_t streamid,
+ apr_status_t reason)
+{
+ return APR_SUCCESS;
+}
Modified: serf/trunk/protocols/http2_protocol.h
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.h?rev=1711679&r1=1711678&r2=1711679&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.h (original)
+++ serf/trunk/protocols/http2_protocol.h Sat Oct 31 19:53:57 2015
@@ -24,6 +24,13 @@
#include "serf.h"
#include "serf_private.h"
+#ifdef _DEBUG
+#include <assert.h>
+#define SERF_H2_assert(x) assert(x)
+#else
+#define SERF_H2_assert(x) ((void)0)
+#endif
+
#define SERF_LOGHTTP2 \
SERF_LOGCOMP_PROTOCOL, (__FILE__ ":" APR_STRINGIFY(__LINE__))
@@ -34,9 +41,19 @@ extern "C" {
/* ********** HTTP2 Frame types ********** */
/* The standard maximum framesize. Always supported */
-#define HTTP2_DEFAULT_MAX_FRAMESIZE 16384
+#define HTTP2_DEFAULT_MAX_FRAMESIZE 16384
/* The default stream and connection window size before updates */
-#define HTTP2_DEFAULT_WINDOW_SIZE 65535
+#define HTTP2_DEFAULT_WINDOW_SIZE 65535
+#define HTTP2_DEFAULT_MAX_CONCURRENT 0xFFFFFFFF
+
+#define HTTP2_PRIORITY_DATA_SIZE 5
+#define HTTP2_RST_DATA_SIZE 4
+#define HTTP2_PROMISE_DATA_SIZE 4
+#define HTTP2_PING_DATA_SIZE 8
+#define HTTP2_GOWAWAY_DATA_SIZE 8
+#define HTTP2_WINDOW_UPDATE_DATA_SIZE 4
+
+#define HTTP2_SETTING_SIZE 6
/* Frame type is an 8 bit unsigned integer */
@@ -87,17 +104,20 @@ extern "C" {
/* ------------------------------------- */
typedef struct serf_http2_protocol_t serf_http2_protocol_t;
+typedef struct serf_http2_stream_data_t serf_http2_stream_data_t;
+
typedef struct serf_http2_stream_t
{
struct serf_http2_protocol_t *h2;
serf_bucket_alloc_t *alloc;
+ /* Opaque implementation details */
+ serf_http2_stream_data_t *data;
+
/* Linked list of currently existing streams */
struct serf_http2_stream_t *next;
struct serf_http2_stream_t *prev;
- serf_request_t *request; /* May be NULL as streams may outlive requests */
-
apr_int64_t lr_window; /* local->remote */
apr_int64_t rl_window; /* remote->local */
@@ -119,6 +139,10 @@ typedef struct serf_http2_stream_t
/* TODO: Priority, etc. */
} serf_http2_stream_t;
+typedef apr_status_t (* serf_http2_processor_t)(void *baton,
+ serf_http2_protocol_t *h2,
+ serf_bucket_t *body);
+
/* Enques an http2 frame for output */
apr_status_t
serf_http2__enqueue_frame(serf_http2_protocol_t *h2,
@@ -133,6 +157,12 @@ serf_http2__stream_create(serf_http2_pro
apr_uint32_t rl_window,
serf_bucket_alloc_t *alloc);
+
+apr_status_t
+serf_http2__enqueue_stream_reset(serf_http2_protocol_t *h2,
+ apr_int32_t streamid,
+ apr_status_t reason);
+
/* Allocates a new stream id for a stream.
BATON is a serf_http2_stream_t * instance.
@@ -147,10 +177,10 @@ void
serf_http2__stream_cleanup(serf_http2_stream_t *stream);
serf_http2_stream_t *
-serf_http2__stream_get_by_id(serf_http2_protocol_t *h2,
- apr_int32_t streamid,
- int create_for_remote,
- int move_first);
+serf_http2__stream_get(serf_http2_protocol_t *h2,
+ apr_int32_t streamid,
+ int create_for_remote,
+ int move_first);
/* Sets up STREAM to handle REQUEST */
apr_status_t
@@ -158,7 +188,33 @@ serf_http2__stream_setup_request(serf_ht
serf_hpack_table_t *hpack_tbl,
serf_request_t *request);
+apr_status_t
+serf_http2__stream_reset(serf_http2_stream_t *stream,
+ apr_status_t reason,
+ int local_reset);
+
+serf_bucket_t *
+serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,
+ serf_bucket_t *bucket,
+ unsigned char frametype,
+ int end_stream,
+ apr_size_t max_entry_size,
+ serf_hpack_table_t *hpack_tbl,
+ serf_config_t *config,
+ serf_bucket_alloc_t *allocator);
+
+serf_bucket_t *
+serf_http2__stream_handle_data(serf_http2_stream_t *stream,
+ serf_bucket_t *bucket,
+ unsigned char frametype,
+ int end_stream,
+ serf_config_t *config,
+ serf_bucket_alloc_t *allocator);
+apr_status_t
+serf_http2__stream_processor(void *baton,
+ serf_http2_protocol_t *h2,
+ serf_bucket_t *bucket);
#ifdef __cplusplus
}
Modified: serf/trunk/protocols/http2_stream.c
URL:
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_stream.c?rev=1711679&r1=1711678&r2=1711679&view=diff
==============================================================================
--- serf/trunk/protocols/http2_stream.c (original)
+++ serf/trunk/protocols/http2_stream.c Sat Oct 31 19:53:57 2015
@@ -29,6 +29,12 @@
#include "protocols/http2_buckets.h"
#include "protocols/http2_protocol.h"
+struct serf_http2_stream_data_t
+{
+ serf_request_t *request; /* May be NULL as streams may outlive requests */
+ serf_bucket_t *response_agg;
+};
+
serf_http2_stream_t *
serf_http2__stream_create(serf_http2_protocol_t *h2,
apr_int32_t streamid,
@@ -41,8 +47,12 @@ serf_http2__stream_create(serf_http2_pro
stream->h2 = h2;
stream->alloc = alloc;
+ stream->data = serf_bucket_mem_alloc(alloc, sizeof(*stream->data));
+
stream->next = stream->prev = NULL;
- stream->request = NULL;
+
+ stream->data->request = NULL;
+ stream->data->response_agg = NULL;
stream->lr_window = lr_window;
stream->rl_window = rl_window;
@@ -60,6 +70,14 @@ serf_http2__stream_create(serf_http2_pro
void
serf_http2__stream_cleanup(serf_http2_stream_t *stream)
{
+ if (stream->data)
+ {
+ if (stream->data->response_agg)
+ serf_bucket_destroy(stream->data->response_agg);
+
+ serf_bucket_mem_free(stream->alloc, stream->data);
+ stream->data = NULL;
+ }
serf_bucket_mem_free(stream->alloc, stream);
}
@@ -72,7 +90,7 @@ serf_http2__stream_setup_request(serf_ht
serf_bucket_t *hpack;
serf_bucket_t *body;
- stream->request = request;
+ stream->data->request = request;
if (!request->req_bkt)
{
@@ -112,3 +130,145 @@ serf_http2__stream_setup_request(serf_ht
return APR_SUCCESS;
}
+
+apr_status_t
+serf_http2__stream_reset(serf_http2_stream_t *stream,
+ apr_status_t reason,
+ int local_reset)
+{
+ stream->status = H2S_CLOSED;
+
+ if (stream->streamid < 0)
+ return APR_SUCCESS;
+
+ if (local_reset)
+ return serf_http2__enqueue_stream_reset(stream->h2,
+ stream->streamid,
+ reason);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t
+stream_response_eof(void *baton,
+ serf_bucket_t *aggregate_bucket)
+{
+ serf_http2_stream_t *stream = baton;
+
+ switch (stream->status)
+ {
+ case H2S_CLOSED:
+ case H2S_HALFCLOSED_REMOTE:
+ return APR_EOF;
+ default:
+ return APR_EAGAIN;
+ }
+}
+
+serf_bucket_t *
+serf_http2__stream_handle_hpack(serf_http2_stream_t *stream,
+ serf_bucket_t *bucket,
+ unsigned char frametype,
+ int end_stream,
+ apr_size_t max_entry_size,
+ serf_hpack_table_t *hpack_tbl,
+ serf_config_t *config,
+ serf_bucket_alloc_t *allocator)
+{
+ if (!stream->data->response_agg)
+ {
+ stream->data->response_agg = serf_bucket_aggregate_create(stream->alloc);
+ serf_bucket_aggregate_hold_open(stream->data->response_agg,
+ stream_response_eof, stream);
+ serf_bucket_set_config(stream->data->response_agg, config);
+ }
+
+ bucket = serf__bucket_hpack_decode_create(bucket, NULL, NULL, max_entry_size,
+ hpack_tbl, allocator);
+
+ serf_bucket_aggregate_append(stream->data->response_agg, bucket);
+
+ if (end_stream)
+ {
+ if (stream->status == H2S_HALFCLOSED_LOCAL)
+ stream->status = H2S_CLOSED;
+ else
+ stream->status = H2S_HALFCLOSED_REMOTE;
+ }
+
+ return NULL;
+}
+
+serf_bucket_t *
+serf_http2__stream_handle_data(serf_http2_stream_t *stream,
+ serf_bucket_t *bucket,
+ unsigned char frametype,
+ int end_stream,
+ serf_config_t *config,
+ serf_bucket_alloc_t *allocator)
+{
+ if (!stream->data->response_agg)
+ {
+ stream->data->response_agg = serf_bucket_aggregate_create(stream->alloc);
+ serf_bucket_aggregate_hold_open(stream->data->response_agg,
+ stream_response_eof, stream);
+
+ serf_bucket_set_config(stream->data->response_agg, config);
+ }
+
+ serf_bucket_aggregate_append(stream->data->response_agg, bucket);
+
+ if (end_stream)
+ {
+ if (stream->status == H2S_HALFCLOSED_LOCAL)
+ stream->status = H2S_CLOSED;
+ else
+ stream->status = H2S_HALFCLOSED_REMOTE;
+ }
+
+ return NULL;
+}
+
+apr_status_t
+serf_http2__stream_processor(void *baton,
+ serf_http2_protocol_t *h2,
+ serf_bucket_t *bucket)
+{
+ serf_http2_stream_t *stream = baton;
+ apr_status_t status = APR_SUCCESS;
+
+ if (!stream->data->response_agg)
+ return APR_EAGAIN;
+
+ /* ### TODO: Delegate to request */
+ while (!status)
+ {
+ const char *data;
+ apr_size_t len;
+
+ status = serf_bucket_read(stream->data->response_agg,
+ SERF_READ_ALL_AVAIL, &data, &len);
+
+ if (!SERF_BUCKET_READ_ERROR(status))
+ {
+ char *printable = serf_bstrmemdup(bucket->allocator, data, len);
+ char *c;
+
+ for (c = printable; *c; c++)
+ {
+ if (((*c < ' ') || (*c > '\x7E')) && !strchr("\r\n", *c)) /*
Poor mans isctrl*/
+ {
+ *c = ' ';
+ }
+ }
+
+#ifdef _DEBUG
+ fputs(printable, stdout);
+#endif
+
+ serf_bucket_mem_free(bucket->allocator, printable);
+ }
+ }
+
+ return status;
+}