Author: rhuijben
Date: Sat Nov 14 09:36:08 2015
New Revision: 1714297

URL: http://svn.apache.org/viewvc?rev=1714297&view=rev
Log:
Redefine the event bucket as a wrapping bucket, to remove dependencies on
implementation details on the 'aggregation' stream around the event bucket.

Calculate total number of bytes read to increase event value.

* buckets/event_buckets.c
  (event_context_t): Extend state a bit.
  (serf__bucket_event_create): Add start event and stream arguments.
  (serf_event_read): Implement wrapping and early destroy of stream.
  (serf_event_readline): Remove function. The default implementation will
    work good enough for writing stream scenarios for now.
  (serf_event_read_iovec,
   serf_event_peek,
   serf_event_get_remaining): Implement wrapping and early destroy of stream.
  (serf_bucket_type__event): Update declaration.

* outgoing.c
  (request_writing_done,
   request_writing_finished): Add argument.
  (write_to_connection): Update registration.

* serf_private.h
  (serf_bucket_event_callback_t): Add bytes_read argument.
  (serf__bucket_event_create): Add stream and start_cb arguments.

Modified:
    serf/trunk/buckets/event_buckets.c
    serf/trunk/outgoing.c
    serf/trunk/serf_private.h

Modified: serf/trunk/buckets/event_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/event_buckets.c?rev=1714297&r1=1714296&r2=1714297&view=diff
==============================================================================
--- serf/trunk/buckets/event_buckets.c (original)
+++ serf/trunk/buckets/event_buckets.c Sat Nov 14 09:36:08 2015
@@ -25,21 +25,31 @@
 
 typedef struct event_context_t
 {
+    apr_uint64_t bytes_read;
+    serf_bucket_t *stream;
     void *baton;
+    serf_bucket_event_callback_t start_cb;
     serf_bucket_event_callback_t eof_cb;
     serf_bucket_event_callback_t destroy_cb;
+    bool at_eof;
 } event_context_t;
 
 serf_bucket_t *serf__bucket_event_create(
-                          void *baton,
-                          serf_bucket_event_callback_t eof_cb,
-                          serf_bucket_event_callback_t destroy_cb,
-                          serf_bucket_alloc_t *allocator)
+                        serf_bucket_t *stream,
+                        void *baton,
+                        serf_bucket_event_callback_t start_cb,
+                        serf_bucket_event_callback_t eof_cb,
+                        serf_bucket_event_callback_t destroy_cb,
+                        serf_bucket_alloc_t *allocator)
 {
     event_context_t *ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+    ctx->bytes_read = 0;
+    ctx->stream = stream;
     ctx->baton = baton;
+    ctx->start_cb = start_cb;
     ctx->eof_cb = eof_cb;
     ctx->destroy_cb = destroy_cb;
+    ctx->at_eof = false;
 
     return serf_bucket_create(&serf_bucket_type__event, allocator, ctx);
 }
@@ -50,30 +60,45 @@ static apr_status_t serf_event_read(serf
                                     apr_size_t *len)
 {
     event_context_t *ctx = bucket->data;
-    apr_status_t status = APR_EOF;
-    *data = NULL;
-    *len = 0;
+    apr_status_t status;
 
-    if (ctx->eof_cb)
-        status = ctx->eof_cb(ctx->baton);
+    if (ctx->start_cb) {
+        status = ctx->start_cb(ctx->baton, ctx->bytes_read);
+        ctx->start_cb = NULL;
+
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+    }
+
+    if (ctx->stream && !ctx->at_eof) {
+        status = serf_bucket_read(ctx->stream, requested, data, len);
+    }
+    else {
+        status = APR_EOF;
+        *data = NULL;
+        *len = 0;
+
+        if (ctx->at_eof && ctx->stream) {
+            serf_bucket_destroy(ctx->stream);
+            ctx->stream = NULL;
+        }
+    }
+
+    if (!SERF_BUCKET_READ_ERROR(status)) {
+        ctx->bytes_read += *len;
+
+        if (APR_STATUS_IS_EOF(status)) {
+            ctx->at_eof = true;
+
+            if (ctx->eof_cb) {
+                status = ctx->eof_cb(ctx->baton, ctx->bytes_read);
+                ctx->eof_cb = NULL;
+                status = SERF_BUCKET_READ_ERROR(status) ? status : APR_EOF;
+            }
+        }
+    }
 
-    return status ? status : APR_EOF;
-}
-
-static apr_status_t serf_event_readline(serf_bucket_t *bucket,
-                                        int acceptable, int *found,
-                                        const char **data, apr_size_t *len)
-{
-    event_context_t *ctx = bucket->data;
-    apr_status_t status = APR_EOF;
-    *found = 0;
-    *data = NULL;
-    *len = 0;
-
-    if (ctx->eof_cb)
-        status = ctx->eof_cb(ctx->baton);
-
-    return status ? status : APR_EOF;
+    return status;
 }
 
 static apr_status_t serf_event_read_iovec(serf_bucket_t *bucket,
@@ -83,13 +108,48 @@ static apr_status_t serf_event_read_iove
                                           int *vecs_used)
 {
     event_context_t *ctx = bucket->data;
-    apr_status_t status = APR_EOF;
-    *vecs_used = 0;
+    apr_status_t status;
 
-    if (ctx->eof_cb)
-        status = ctx->eof_cb(ctx->baton);
+    if (ctx->start_cb) {
+        status = ctx->start_cb(ctx->baton, ctx->bytes_read);
+        ctx->start_cb = NULL;
+
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+    }
+
+    if (ctx->stream && !ctx->at_eof) {
+        status = serf_bucket_read_iovec(ctx->stream, requested, vecs_size,
+                                        vecs, vecs_used);
+    }
+    else {
+        status = APR_EOF;
+        *vecs_used = 0;
+
+        if (ctx->at_eof && ctx->stream) {
+            serf_bucket_destroy(ctx->stream);
+            ctx->stream = NULL;
+        }
+    }
+
+    if (!SERF_BUCKET_READ_ERROR(status)) {
+        int i;
+
+        for (i = 0; i < *vecs_used; i++)
+            ctx->bytes_read += vecs[i].iov_len;
+
+        if (APR_STATUS_IS_EOF(status)) {
+            ctx->at_eof = true;
+
+            if (ctx->eof_cb) {
+                status = ctx->eof_cb(ctx->baton, ctx->bytes_read);
+                ctx->eof_cb = NULL;
+                status = SERF_BUCKET_READ_ERROR(status) ? status : APR_EOF;
+            }
+        }
+    }
 
-    return status ? status : APR_EOF;
+    return status;
 }
 
 static apr_status_t serf_event_peek(serf_bucket_t *bucket,
@@ -97,30 +157,57 @@ static apr_status_t serf_event_peek(serf
                                     apr_size_t *len)
 {
     event_context_t *ctx = bucket->data;
-    apr_status_t status = APR_EOF;
-    *data = NULL;
-    *len = 0;
-
-    if (ctx->eof_cb)
-        status = ctx->eof_cb(ctx->baton);
-
-    if (APR_STATUS_IS_EAGAIN(status))
-        return APR_SUCCESS;
-    else
-        return status ? status : APR_EOF;
+    apr_status_t status;
+
+    if (ctx->start_cb) {
+        status = ctx->start_cb(ctx->baton, ctx->bytes_read);
+        ctx->start_cb = NULL;
+
+        if (SERF_BUCKET_READ_ERROR(status))
+            return status;
+    }
+
+    if (ctx->stream && !ctx->at_eof) {
+        status = serf_bucket_peek(ctx->stream, data, len);
+    }
+    else {
+        status = APR_EOF;
+        *len = 0;
+
+        if (ctx->at_eof && ctx->stream) {
+            serf_bucket_destroy(ctx->stream);
+            ctx->stream = NULL;
+        }
+    }
+
+    return status;
 }
 
 static apr_uint64_t serf_event_get_remaining(serf_bucket_t *bucket)
 {
-    return 0;
+    event_context_t *ctx = bucket->data;
+
+    if (ctx->stream && !ctx->at_eof) {
+        return serf_bucket_get_remaining(ctx->stream);
+    }
+    else {
+        if (ctx->at_eof && ctx->stream) {
+            serf_bucket_destroy(ctx->stream);
+            ctx->stream = NULL;
+        }
+        return 0;
+    }
 }
 
 static void serf_event_destroy(serf_bucket_t *bucket)
 {
     event_context_t *ctx = bucket->data;
 
+    if (ctx->stream)
+        serf_bucket_destroy(ctx->stream);
+
     if (ctx->destroy_cb)
-        (void)ctx->destroy_cb(ctx->baton);
+        (void)ctx->destroy_cb(ctx->baton, ctx->bytes_read);
 
     serf_default_destroy_and_data(bucket);
 }
@@ -128,7 +215,7 @@ static void serf_event_destroy(serf_buck
 const serf_bucket_type_t serf_bucket_type__event = {
     "EVENT",
     serf_event_read,
-    serf_event_readline,
+    serf_default_readline,
     serf_event_read_iovec,
     serf_default_read_for_sendfile,
     serf_buckets_are_v2,

Modified: serf/trunk/outgoing.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1714297&r1=1714296&r2=1714297&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Sat Nov 14 09:36:08 2015
@@ -904,7 +904,8 @@ apr_status_t serf__connection_flush(serf
    At this time we know the request is written, but we can't destroy
    the buckets yet as they might still be referenced by the connection
    vecs. */
-static apr_status_t request_writing_done(void *baton)
+static apr_status_t request_writing_done(void *baton,
+                                         apr_uint64_t bytes_read)
 {
   serf_request_t *request = baton;
 
@@ -920,7 +921,8 @@ static apr_status_t request_writing_done
 /* Implements serf_bucket_event_callback_t and is called after the
    request buckets are no longer needed. More precisely the outgoing
    buckets are already destroyed. */
-static apr_status_t request_writing_finished(void *baton)
+static apr_status_t request_writing_finished(void *baton,
+                                             apr_uint64_t bytes_read)
 {
     serf_request_t *request = baton;
     serf_connection_t *conn = request->conn;
@@ -1057,11 +1059,12 @@ static apr_status_t write_to_connection(
             }
 
             request->writing = SERF_WRITING_STARTED;
-            serf_bucket_aggregate_append(ostreamt, request->req_bkt);
 
             /* And now add an event bucket to keep track of when the request
                has been completely written */
-            event_bucket = serf__bucket_event_create(request,
+            event_bucket = serf__bucket_event_create(request->req_bkt,
+                                                     request,
+                                                     NULL,
                                                      request_writing_done,
                                                      request_writing_finished,
                                                      conn->allocator);
@@ -1395,6 +1398,23 @@ apr_status_t serf__process_connection(se
                                       apr_int16_t events)
 {
     apr_status_t status;
+#ifdef SERF_DEBUG_BUCKET_USE
+      serf_request_t *rq;
+#endif
+
+#ifdef SERF_DEBUG_BUCKET_USE
+    serf_debug__entered_loop(conn->allocator);
+
+    for (rq = conn->written_reqs; rq; rq = rq->next) {
+          if (rq->allocator)
+              serf_debug__entered_loop(rq->allocator);
+    }
+
+    for (rq = conn->done_reqs; rq; rq = rq->next) {
+          if (rq->allocator)
+              serf_debug__entered_loop(rq->allocator);
+    }
+#endif
 
     /* POLLHUP/ERR should come after POLLIN so if there's an error message or
      * the like sitting on the connection, we give the app a chance to read

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1714297&r1=1714296&r2=1714297&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Sat Nov 14 09:36:08 2015
@@ -667,14 +667,16 @@ int serf__log_enabled(apr_uint32_t level
 extern const serf_bucket_type_t serf_bucket_type__event;
 #define SERF__BUCKET_IS_EVENT(b) SERF_BUCKET_CHECK((b), _event)
 
-typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton);
+typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton,
+                                                    apr_uint64_t bytes_read);
 
 serf_bucket_t *serf__bucket_event_create(
+                        serf_bucket_t *stream,
                         void *baton,
+                        serf_bucket_event_callback_t start_cb,
                         serf_bucket_event_callback_t eof_cb,
                         serf_bucket_event_callback_t destroy_cb,
                         serf_bucket_alloc_t *allocator);
 
 
-
 #endif


Reply via email to