Author: rhuijben Date: Sat Nov 14 09:36:08 2015 New Revision: 1714297 URL: http://svn.apache.org/viewvc?rev=1714297&view=rev Log: Redefine the event bucket as a wrapping bucket, to remove dependencies on implementation details on the 'aggregation' stream around the event bucket.
Calculate total number of bytes read to increase event value. * buckets/event_buckets.c (event_context_t): Extend state a bit. (serf__bucket_event_create): Add start event and stream arguments. (serf_event_read): Implement wrapping and early destroy of stream. (serf_event_readline): Remove function. The default implementation will work good enough for writing stream scenarios for now. (serf_event_read_iovec, serf_event_peek, serf_event_get_remaining): Implement wrapping and early destroy of stream. (serf_bucket_type__event): Update declaration. * outgoing.c (request_writing_done, request_writing_finished): Add argument. (write_to_connection): Update registration. * serf_private.h (serf_bucket_event_callback_t): Add bytes_read argument. (serf__bucket_event_create): Add stream and start_cb arguments. Modified: serf/trunk/buckets/event_buckets.c serf/trunk/outgoing.c serf/trunk/serf_private.h Modified: serf/trunk/buckets/event_buckets.c URL: http://svn.apache.org/viewvc/serf/trunk/buckets/event_buckets.c?rev=1714297&r1=1714296&r2=1714297&view=diff ============================================================================== --- serf/trunk/buckets/event_buckets.c (original) +++ serf/trunk/buckets/event_buckets.c Sat Nov 14 09:36:08 2015 @@ -25,21 +25,31 @@ typedef struct event_context_t { + apr_uint64_t bytes_read; + serf_bucket_t *stream; void *baton; + serf_bucket_event_callback_t start_cb; serf_bucket_event_callback_t eof_cb; serf_bucket_event_callback_t destroy_cb; + bool at_eof; } event_context_t; serf_bucket_t *serf__bucket_event_create( - void *baton, - serf_bucket_event_callback_t eof_cb, - serf_bucket_event_callback_t destroy_cb, - serf_bucket_alloc_t *allocator) + serf_bucket_t *stream, + void *baton, + serf_bucket_event_callback_t start_cb, + serf_bucket_event_callback_t eof_cb, + serf_bucket_event_callback_t destroy_cb, + serf_bucket_alloc_t *allocator) { event_context_t *ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); + ctx->bytes_read = 0; + ctx->stream = stream; ctx->baton = baton; + ctx->start_cb = start_cb; ctx->eof_cb = eof_cb; ctx->destroy_cb = destroy_cb; + ctx->at_eof = false; return serf_bucket_create(&serf_bucket_type__event, allocator, ctx); } @@ -50,30 +60,45 @@ static apr_status_t serf_event_read(serf apr_size_t *len) { event_context_t *ctx = bucket->data; - apr_status_t status = APR_EOF; - *data = NULL; - *len = 0; + apr_status_t status; - if (ctx->eof_cb) - status = ctx->eof_cb(ctx->baton); + if (ctx->start_cb) { + status = ctx->start_cb(ctx->baton, ctx->bytes_read); + ctx->start_cb = NULL; + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + } + + if (ctx->stream && !ctx->at_eof) { + status = serf_bucket_read(ctx->stream, requested, data, len); + } + else { + status = APR_EOF; + *data = NULL; + *len = 0; + + if (ctx->at_eof && ctx->stream) { + serf_bucket_destroy(ctx->stream); + ctx->stream = NULL; + } + } + + if (!SERF_BUCKET_READ_ERROR(status)) { + ctx->bytes_read += *len; + + if (APR_STATUS_IS_EOF(status)) { + ctx->at_eof = true; + + if (ctx->eof_cb) { + status = ctx->eof_cb(ctx->baton, ctx->bytes_read); + ctx->eof_cb = NULL; + status = SERF_BUCKET_READ_ERROR(status) ? status : APR_EOF; + } + } + } - return status ? status : APR_EOF; -} - -static apr_status_t serf_event_readline(serf_bucket_t *bucket, - int acceptable, int *found, - const char **data, apr_size_t *len) -{ - event_context_t *ctx = bucket->data; - apr_status_t status = APR_EOF; - *found = 0; - *data = NULL; - *len = 0; - - if (ctx->eof_cb) - status = ctx->eof_cb(ctx->baton); - - return status ? status : APR_EOF; + return status; } static apr_status_t serf_event_read_iovec(serf_bucket_t *bucket, @@ -83,13 +108,48 @@ static apr_status_t serf_event_read_iove int *vecs_used) { event_context_t *ctx = bucket->data; - apr_status_t status = APR_EOF; - *vecs_used = 0; + apr_status_t status; - if (ctx->eof_cb) - status = ctx->eof_cb(ctx->baton); + if (ctx->start_cb) { + status = ctx->start_cb(ctx->baton, ctx->bytes_read); + ctx->start_cb = NULL; + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + } + + if (ctx->stream && !ctx->at_eof) { + status = serf_bucket_read_iovec(ctx->stream, requested, vecs_size, + vecs, vecs_used); + } + else { + status = APR_EOF; + *vecs_used = 0; + + if (ctx->at_eof && ctx->stream) { + serf_bucket_destroy(ctx->stream); + ctx->stream = NULL; + } + } + + if (!SERF_BUCKET_READ_ERROR(status)) { + int i; + + for (i = 0; i < *vecs_used; i++) + ctx->bytes_read += vecs[i].iov_len; + + if (APR_STATUS_IS_EOF(status)) { + ctx->at_eof = true; + + if (ctx->eof_cb) { + status = ctx->eof_cb(ctx->baton, ctx->bytes_read); + ctx->eof_cb = NULL; + status = SERF_BUCKET_READ_ERROR(status) ? status : APR_EOF; + } + } + } - return status ? status : APR_EOF; + return status; } static apr_status_t serf_event_peek(serf_bucket_t *bucket, @@ -97,30 +157,57 @@ static apr_status_t serf_event_peek(serf apr_size_t *len) { event_context_t *ctx = bucket->data; - apr_status_t status = APR_EOF; - *data = NULL; - *len = 0; - - if (ctx->eof_cb) - status = ctx->eof_cb(ctx->baton); - - if (APR_STATUS_IS_EAGAIN(status)) - return APR_SUCCESS; - else - return status ? status : APR_EOF; + apr_status_t status; + + if (ctx->start_cb) { + status = ctx->start_cb(ctx->baton, ctx->bytes_read); + ctx->start_cb = NULL; + + if (SERF_BUCKET_READ_ERROR(status)) + return status; + } + + if (ctx->stream && !ctx->at_eof) { + status = serf_bucket_peek(ctx->stream, data, len); + } + else { + status = APR_EOF; + *len = 0; + + if (ctx->at_eof && ctx->stream) { + serf_bucket_destroy(ctx->stream); + ctx->stream = NULL; + } + } + + return status; } static apr_uint64_t serf_event_get_remaining(serf_bucket_t *bucket) { - return 0; + event_context_t *ctx = bucket->data; + + if (ctx->stream && !ctx->at_eof) { + return serf_bucket_get_remaining(ctx->stream); + } + else { + if (ctx->at_eof && ctx->stream) { + serf_bucket_destroy(ctx->stream); + ctx->stream = NULL; + } + return 0; + } } static void serf_event_destroy(serf_bucket_t *bucket) { event_context_t *ctx = bucket->data; + if (ctx->stream) + serf_bucket_destroy(ctx->stream); + if (ctx->destroy_cb) - (void)ctx->destroy_cb(ctx->baton); + (void)ctx->destroy_cb(ctx->baton, ctx->bytes_read); serf_default_destroy_and_data(bucket); } @@ -128,7 +215,7 @@ static void serf_event_destroy(serf_buck const serf_bucket_type_t serf_bucket_type__event = { "EVENT", serf_event_read, - serf_event_readline, + serf_default_readline, serf_event_read_iovec, serf_default_read_for_sendfile, serf_buckets_are_v2, Modified: serf/trunk/outgoing.c URL: http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1714297&r1=1714296&r2=1714297&view=diff ============================================================================== --- serf/trunk/outgoing.c (original) +++ serf/trunk/outgoing.c Sat Nov 14 09:36:08 2015 @@ -904,7 +904,8 @@ apr_status_t serf__connection_flush(serf At this time we know the request is written, but we can't destroy the buckets yet as they might still be referenced by the connection vecs. */ -static apr_status_t request_writing_done(void *baton) +static apr_status_t request_writing_done(void *baton, + apr_uint64_t bytes_read) { serf_request_t *request = baton; @@ -920,7 +921,8 @@ static apr_status_t request_writing_done /* Implements serf_bucket_event_callback_t and is called after the request buckets are no longer needed. More precisely the outgoing buckets are already destroyed. */ -static apr_status_t request_writing_finished(void *baton) +static apr_status_t request_writing_finished(void *baton, + apr_uint64_t bytes_read) { serf_request_t *request = baton; serf_connection_t *conn = request->conn; @@ -1057,11 +1059,12 @@ static apr_status_t write_to_connection( } request->writing = SERF_WRITING_STARTED; - serf_bucket_aggregate_append(ostreamt, request->req_bkt); /* And now add an event bucket to keep track of when the request has been completely written */ - event_bucket = serf__bucket_event_create(request, + event_bucket = serf__bucket_event_create(request->req_bkt, + request, + NULL, request_writing_done, request_writing_finished, conn->allocator); @@ -1395,6 +1398,23 @@ apr_status_t serf__process_connection(se apr_int16_t events) { apr_status_t status; +#ifdef SERF_DEBUG_BUCKET_USE + serf_request_t *rq; +#endif + +#ifdef SERF_DEBUG_BUCKET_USE + serf_debug__entered_loop(conn->allocator); + + for (rq = conn->written_reqs; rq; rq = rq->next) { + if (rq->allocator) + serf_debug__entered_loop(rq->allocator); + } + + for (rq = conn->done_reqs; rq; rq = rq->next) { + if (rq->allocator) + serf_debug__entered_loop(rq->allocator); + } +#endif /* POLLHUP/ERR should come after POLLIN so if there's an error message or * the like sitting on the connection, we give the app a chance to read Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714297&r1=1714296&r2=1714297&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Sat Nov 14 09:36:08 2015 @@ -667,14 +667,16 @@ int serf__log_enabled(apr_uint32_t level extern const serf_bucket_type_t serf_bucket_type__event; #define SERF__BUCKET_IS_EVENT(b) SERF_BUCKET_CHECK((b), _event) -typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton); +typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton, + apr_uint64_t bytes_read); serf_bucket_t *serf__bucket_event_create( + serf_bucket_t *stream, void *baton, + serf_bucket_event_callback_t start_cb, serf_bucket_event_callback_t eof_cb, serf_bucket_event_callback_t destroy_cb, serf_bucket_alloc_t *allocator); - #endif