Author: rhuijben
Date: Sat Nov 14 09:36:08 2015
New Revision: 1714297
URL: http://svn.apache.org/viewvc?rev=1714297&view=rev
Log:
Redefine the event bucket as a wrapping bucket, to remove dependencies on
implementation details on the 'aggregation' stream around the event bucket.
Calculate total number of bytes read to increase event value.
* buckets/event_buckets.c
(event_context_t): Extend state a bit.
(serf__bucket_event_create): Add start event and stream arguments.
(serf_event_read): Implement wrapping and early destroy of stream.
(serf_event_readline): Remove function. The default implementation will
work good enough for writing stream scenarios for now.
(serf_event_read_iovec,
serf_event_peek,
serf_event_get_remaining): Implement wrapping and early destroy of stream.
(serf_bucket_type__event): Update declaration.
* outgoing.c
(request_writing_done,
request_writing_finished): Add argument.
(write_to_connection): Update registration.
* serf_private.h
(serf_bucket_event_callback_t): Add bytes_read argument.
(serf__bucket_event_create): Add stream and start_cb arguments.
Modified:
serf/trunk/buckets/event_buckets.c
serf/trunk/outgoing.c
serf/trunk/serf_private.h
Modified: serf/trunk/buckets/event_buckets.c
URL:
http://svn.apache.org/viewvc/serf/trunk/buckets/event_buckets.c?rev=1714297&r1=1714296&r2=1714297&view=diff
==============================================================================
--- serf/trunk/buckets/event_buckets.c (original)
+++ serf/trunk/buckets/event_buckets.c Sat Nov 14 09:36:08 2015
@@ -25,21 +25,31 @@
typedef struct event_context_t
{
+ apr_uint64_t bytes_read;
+ serf_bucket_t *stream;
void *baton;
+ serf_bucket_event_callback_t start_cb;
serf_bucket_event_callback_t eof_cb;
serf_bucket_event_callback_t destroy_cb;
+ bool at_eof;
} event_context_t;
serf_bucket_t *serf__bucket_event_create(
- void *baton,
- serf_bucket_event_callback_t eof_cb,
- serf_bucket_event_callback_t destroy_cb,
- serf_bucket_alloc_t *allocator)
+ serf_bucket_t *stream,
+ void *baton,
+ serf_bucket_event_callback_t start_cb,
+ serf_bucket_event_callback_t eof_cb,
+ serf_bucket_event_callback_t destroy_cb,
+ serf_bucket_alloc_t *allocator)
{
event_context_t *ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+ ctx->bytes_read = 0;
+ ctx->stream = stream;
ctx->baton = baton;
+ ctx->start_cb = start_cb;
ctx->eof_cb = eof_cb;
ctx->destroy_cb = destroy_cb;
+ ctx->at_eof = false;
return serf_bucket_create(&serf_bucket_type__event, allocator, ctx);
}
@@ -50,30 +60,45 @@ static apr_status_t serf_event_read(serf
apr_size_t *len)
{
event_context_t *ctx = bucket->data;
- apr_status_t status = APR_EOF;
- *data = NULL;
- *len = 0;
+ apr_status_t status;
- if (ctx->eof_cb)
- status = ctx->eof_cb(ctx->baton);
+ if (ctx->start_cb) {
+ status = ctx->start_cb(ctx->baton, ctx->bytes_read);
+ ctx->start_cb = NULL;
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+ }
+
+ if (ctx->stream && !ctx->at_eof) {
+ status = serf_bucket_read(ctx->stream, requested, data, len);
+ }
+ else {
+ status = APR_EOF;
+ *data = NULL;
+ *len = 0;
+
+ if (ctx->at_eof && ctx->stream) {
+ serf_bucket_destroy(ctx->stream);
+ ctx->stream = NULL;
+ }
+ }
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ ctx->bytes_read += *len;
+
+ if (APR_STATUS_IS_EOF(status)) {
+ ctx->at_eof = true;
+
+ if (ctx->eof_cb) {
+ status = ctx->eof_cb(ctx->baton, ctx->bytes_read);
+ ctx->eof_cb = NULL;
+ status = SERF_BUCKET_READ_ERROR(status) ? status : APR_EOF;
+ }
+ }
+ }
- return status ? status : APR_EOF;
-}
-
-static apr_status_t serf_event_readline(serf_bucket_t *bucket,
- int acceptable, int *found,
- const char **data, apr_size_t *len)
-{
- event_context_t *ctx = bucket->data;
- apr_status_t status = APR_EOF;
- *found = 0;
- *data = NULL;
- *len = 0;
-
- if (ctx->eof_cb)
- status = ctx->eof_cb(ctx->baton);
-
- return status ? status : APR_EOF;
+ return status;
}
static apr_status_t serf_event_read_iovec(serf_bucket_t *bucket,
@@ -83,13 +108,48 @@ static apr_status_t serf_event_read_iove
int *vecs_used)
{
event_context_t *ctx = bucket->data;
- apr_status_t status = APR_EOF;
- *vecs_used = 0;
+ apr_status_t status;
- if (ctx->eof_cb)
- status = ctx->eof_cb(ctx->baton);
+ if (ctx->start_cb) {
+ status = ctx->start_cb(ctx->baton, ctx->bytes_read);
+ ctx->start_cb = NULL;
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+ }
+
+ if (ctx->stream && !ctx->at_eof) {
+ status = serf_bucket_read_iovec(ctx->stream, requested, vecs_size,
+ vecs, vecs_used);
+ }
+ else {
+ status = APR_EOF;
+ *vecs_used = 0;
+
+ if (ctx->at_eof && ctx->stream) {
+ serf_bucket_destroy(ctx->stream);
+ ctx->stream = NULL;
+ }
+ }
+
+ if (!SERF_BUCKET_READ_ERROR(status)) {
+ int i;
+
+ for (i = 0; i < *vecs_used; i++)
+ ctx->bytes_read += vecs[i].iov_len;
+
+ if (APR_STATUS_IS_EOF(status)) {
+ ctx->at_eof = true;
+
+ if (ctx->eof_cb) {
+ status = ctx->eof_cb(ctx->baton, ctx->bytes_read);
+ ctx->eof_cb = NULL;
+ status = SERF_BUCKET_READ_ERROR(status) ? status : APR_EOF;
+ }
+ }
+ }
- return status ? status : APR_EOF;
+ return status;
}
static apr_status_t serf_event_peek(serf_bucket_t *bucket,
@@ -97,30 +157,57 @@ static apr_status_t serf_event_peek(serf
apr_size_t *len)
{
event_context_t *ctx = bucket->data;
- apr_status_t status = APR_EOF;
- *data = NULL;
- *len = 0;
-
- if (ctx->eof_cb)
- status = ctx->eof_cb(ctx->baton);
-
- if (APR_STATUS_IS_EAGAIN(status))
- return APR_SUCCESS;
- else
- return status ? status : APR_EOF;
+ apr_status_t status;
+
+ if (ctx->start_cb) {
+ status = ctx->start_cb(ctx->baton, ctx->bytes_read);
+ ctx->start_cb = NULL;
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+ }
+
+ if (ctx->stream && !ctx->at_eof) {
+ status = serf_bucket_peek(ctx->stream, data, len);
+ }
+ else {
+ status = APR_EOF;
+ *len = 0;
+
+ if (ctx->at_eof && ctx->stream) {
+ serf_bucket_destroy(ctx->stream);
+ ctx->stream = NULL;
+ }
+ }
+
+ return status;
}
static apr_uint64_t serf_event_get_remaining(serf_bucket_t *bucket)
{
- return 0;
+ event_context_t *ctx = bucket->data;
+
+ if (ctx->stream && !ctx->at_eof) {
+ return serf_bucket_get_remaining(ctx->stream);
+ }
+ else {
+ if (ctx->at_eof && ctx->stream) {
+ serf_bucket_destroy(ctx->stream);
+ ctx->stream = NULL;
+ }
+ return 0;
+ }
}
static void serf_event_destroy(serf_bucket_t *bucket)
{
event_context_t *ctx = bucket->data;
+ if (ctx->stream)
+ serf_bucket_destroy(ctx->stream);
+
if (ctx->destroy_cb)
- (void)ctx->destroy_cb(ctx->baton);
+ (void)ctx->destroy_cb(ctx->baton, ctx->bytes_read);
serf_default_destroy_and_data(bucket);
}
@@ -128,7 +215,7 @@ static void serf_event_destroy(serf_buck
const serf_bucket_type_t serf_bucket_type__event = {
"EVENT",
serf_event_read,
- serf_event_readline,
+ serf_default_readline,
serf_event_read_iovec,
serf_default_read_for_sendfile,
serf_buckets_are_v2,
Modified: serf/trunk/outgoing.c
URL:
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1714297&r1=1714296&r2=1714297&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Sat Nov 14 09:36:08 2015
@@ -904,7 +904,8 @@ apr_status_t serf__connection_flush(serf
At this time we know the request is written, but we can't destroy
the buckets yet as they might still be referenced by the connection
vecs. */
-static apr_status_t request_writing_done(void *baton)
+static apr_status_t request_writing_done(void *baton,
+ apr_uint64_t bytes_read)
{
serf_request_t *request = baton;
@@ -920,7 +921,8 @@ static apr_status_t request_writing_done
/* Implements serf_bucket_event_callback_t and is called after the
request buckets are no longer needed. More precisely the outgoing
buckets are already destroyed. */
-static apr_status_t request_writing_finished(void *baton)
+static apr_status_t request_writing_finished(void *baton,
+ apr_uint64_t bytes_read)
{
serf_request_t *request = baton;
serf_connection_t *conn = request->conn;
@@ -1057,11 +1059,12 @@ static apr_status_t write_to_connection(
}
request->writing = SERF_WRITING_STARTED;
- serf_bucket_aggregate_append(ostreamt, request->req_bkt);
/* And now add an event bucket to keep track of when the request
has been completely written */
- event_bucket = serf__bucket_event_create(request,
+ event_bucket = serf__bucket_event_create(request->req_bkt,
+ request,
+ NULL,
request_writing_done,
request_writing_finished,
conn->allocator);
@@ -1395,6 +1398,23 @@ apr_status_t serf__process_connection(se
apr_int16_t events)
{
apr_status_t status;
+#ifdef SERF_DEBUG_BUCKET_USE
+ serf_request_t *rq;
+#endif
+
+#ifdef SERF_DEBUG_BUCKET_USE
+ serf_debug__entered_loop(conn->allocator);
+
+ for (rq = conn->written_reqs; rq; rq = rq->next) {
+ if (rq->allocator)
+ serf_debug__entered_loop(rq->allocator);
+ }
+
+ for (rq = conn->done_reqs; rq; rq = rq->next) {
+ if (rq->allocator)
+ serf_debug__entered_loop(rq->allocator);
+ }
+#endif
/* POLLHUP/ERR should come after POLLIN so if there's an error message or
* the like sitting on the connection, we give the app a chance to read
Modified: serf/trunk/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714297&r1=1714296&r2=1714297&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Sat Nov 14 09:36:08 2015
@@ -667,14 +667,16 @@ int serf__log_enabled(apr_uint32_t level
extern const serf_bucket_type_t serf_bucket_type__event;
#define SERF__BUCKET_IS_EVENT(b) SERF_BUCKET_CHECK((b), _event)
-typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton);
+typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton,
+ apr_uint64_t bytes_read);
serf_bucket_t *serf__bucket_event_create(
+ serf_bucket_t *stream,
void *baton,
+ serf_bucket_event_callback_t start_cb,
serf_bucket_event_callback_t eof_cb,
serf_bucket_event_callback_t destroy_cb,
serf_bucket_alloc_t *allocator);
-
#endif