Author: rhuijben
Date: Mon Nov  9 17:49:59 2015
New Revision: 1713489

URL: http://svn.apache.org/viewvc?rev=1713489&view=rev
Log:
Replace the track bucket in the request writing with a tiny bit more
advanced event bucket that tracks both writing done and destroyed. The
timings of these callbacks will allow simplifying some logic introduced
in r1712776.

For now declare the event bucket as a private type.

* buckets/event_buckets.c
  New file.

* outgoing.c
  (request_writing_done): New function.
  (request_writing_finished): Tweak to implement serf_bucket_event_callback_t.
  (write_to_connection): Add event bucket directly after the request bucket,
    instead of an aggregate when the writing is done.

* serf_private.h
  (serf_bucket_type__event): New bucket type.
  (SERF__BUCKET_IS_EVENT): New define.
  (serf_bucket_event_callback_t): New typedef.
  (serf__bucket_event_create): New function.

Added:
    serf/trunk/buckets/event_buckets.c   (with props)
Modified:
    serf/trunk/outgoing.c
    serf/trunk/serf_private.h

Added: serf/trunk/buckets/event_buckets.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/buckets/event_buckets.c?rev=1713489&view=auto
==============================================================================
--- serf/trunk/buckets/event_buckets.c (added)
+++ serf/trunk/buckets/event_buckets.c Mon Nov  9 17:49:59 2015
@@ -0,0 +1,141 @@
+/* ====================================================================
+ *    Licensed to the Apache Software Foundation (ASF) under one
+ *    or more contributor license agreements.  See the NOTICE file
+ *    distributed with this work for additional information
+ *    regarding copyright ownership.  The ASF licenses this file
+ *    to you under the Apache License, Version 2.0 (the
+ *    "License"); you may not use this file except in compliance
+ *    with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing,
+ *    software distributed under the License is distributed on an
+ *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *    KIND, either express or implied.  See the License for the
+ *    specific language governing permissions and limitations
+ *    under the License.
+ * ====================================================================
+ */
+
+#include "serf.h"
+#include "serf_bucket_util.h"
+#include "serf_private.h"
+
+typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton);
+
+typedef struct event_context_t
+{
+    void *baton;
+    serf_bucket_event_callback_t eof_cb;
+    serf_bucket_event_callback_t destroy_cb;
+} event_context_t;
+
+serf_bucket_t *serf__bucket_event_create(
+                          void *baton,
+                          serf_bucket_event_callback_t eof_cb,
+                          serf_bucket_event_callback_t destroy_cb,
+                          serf_bucket_alloc_t *allocator)
+{
+    event_context_t *ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
+    ctx->baton = baton;
+    ctx->eof_cb = eof_cb;
+    ctx->destroy_cb = destroy_cb;
+
+    return serf_bucket_create(&serf_bucket_type__event, allocator, ctx);
+}
+
+static apr_status_t serf_event_read(serf_bucket_t *bucket,
+                                    apr_size_t requested,
+                                    const char **data,
+                                    apr_size_t *len)
+{
+    event_context_t *ctx = bucket->data;
+    apr_status_t status = APR_EOF;
+    *data = NULL;
+    *len = 0;
+
+    if (ctx->eof_cb)
+        status = ctx->eof_cb(ctx->baton);
+
+    return status ? status : APR_EOF;
+}
+
+static apr_status_t serf_event_readline(serf_bucket_t *bucket,
+                                        int acceptable, int *found,
+                                        const char **data, apr_size_t *len)
+{
+    event_context_t *ctx = bucket->data;
+    apr_status_t status = APR_EOF;
+    *found = 0;
+    *data = NULL;
+    *len = 0;
+
+    if (ctx->eof_cb)
+        status = ctx->eof_cb(ctx->baton);
+
+    return status ? status : APR_EOF;
+}
+
+static apr_status_t serf_event_read_iovec(serf_bucket_t *bucket,
+                                          apr_size_t requested,
+                                          int vecs_size,
+                                          struct iovec *vecs,
+                                          int *vecs_used)
+{
+    event_context_t *ctx = bucket->data;
+    apr_status_t status = APR_EOF;
+    *vecs_used = 0;
+
+    if (ctx->eof_cb)
+        status = ctx->eof_cb(ctx->baton);
+
+    return status ? status : APR_EOF;
+}
+
+static apr_status_t serf_event_peek(serf_bucket_t *bucket,
+                                    const char **data,
+                                    apr_size_t *len)
+{
+    event_context_t *ctx = bucket->data;
+    apr_status_t status = APR_EOF;
+    *data = NULL;
+    *len = 0;
+
+    if (ctx->eof_cb)
+        status = ctx->eof_cb(ctx->baton);
+
+    if (APR_STATUS_IS_EAGAIN(status))
+        return APR_SUCCESS;
+    else
+        return status ? status : APR_EOF;
+}
+
+static apr_uint64_t serf_event_get_remaining(serf_bucket_t *bucket)
+{
+    return 0;
+}
+
+static void serf_event_destroy(serf_bucket_t *bucket)
+{
+    event_context_t *ctx = bucket->data;
+
+    if (ctx->destroy_cb)
+        (void)ctx->destroy_cb(ctx->baton);
+
+    serf_default_destroy_and_data(bucket);
+}
+
+const serf_bucket_type_t serf_bucket_type__event = {
+    "EVENT",
+    serf_event_read,
+    serf_event_readline,
+    serf_event_read_iovec,
+    serf_default_read_for_sendfile,
+    serf_buckets_are_v2,
+    serf_event_peek,
+    serf_event_destroy,
+    serf_default_read_bucket,
+    serf_event_get_remaining,
+    serf_default_ignore_config,
+};

Propchange: serf/trunk/buckets/event_buckets.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: serf/trunk/outgoing.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1713489&r1=1713488&r2=1713489&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Mon Nov  9 17:49:59 2015
@@ -902,18 +902,39 @@ apr_status_t serf__connection_flush(serf
     return status;
 }
 
+/* Implements serf_bucket_event_callback_t and is called (potentially
+   more than once) after the request buckets are completely read.
 
-/* Implements serf_bucket_aggregate_eof_t to mark that the request that is
-   already DONE writing has actually FINISHED writing. */
-static apr_status_t request_writing_finished(void *baton,
-                                             serf_bucket_t *aggregate_bucket)
+   At this time we know the request is written, but we can't destroy
+   the buckets yet as they might still be referenced by the connection
+   vecs. */
+static apr_status_t request_writing_done(void *baton)
+{
+  serf_request_t *request = baton;
+
+  if (request->writing == SERF_WRITING_STARTED) {
+      request->writing = SERF_WRITING_DONE;
+
+      /* TODO: Handle request done */
+  }
+  return APR_EOF; /* Done with the event bucket */
+}
+
+
+/* Implements serf_bucket_event_callback_t and is called after the
+   request buckets are no longer needed. More precisely the outgoing
+   buckets are already destroyed. */
+static apr_status_t request_writing_finished(void *baton)
 {
     serf_request_t *request = baton;
 
-    if (request->writing == SERF_WRITING_DONE)
-    request->writing = SERF_WRITING_FINISHED;
+    if (request->writing == SERF_WRITING_DONE) {
+        request->writing = SERF_WRITING_FINISHED;
+
+        /* TODO: Destroy request if we no longer need it */
+    }
 
-    return APR_EOF;
+    return APR_EOF; /* Done with event bucket. Status is ignored */
 }
 
 /* write data out to the connection */
@@ -989,6 +1010,8 @@ static apr_status_t write_to_connection(
         }
 
         if (request && request->writing == SERF_WRITING_NONE) {
+            serf_bucket_t *event_bucket;
+
             if (request->req_bkt == NULL) {
                 read_status = serf__setup_request(request);
                 if (read_status) {
@@ -999,6 +1022,14 @@ static apr_status_t write_to_connection(
 
             request->writing = SERF_WRITING_STARTED;
             serf_bucket_aggregate_append(ostreamt, request->req_bkt);
+
+            /* And now add an event bucket to keep track of when the request
+               has been completely written */
+            event_bucket = serf_bucket_event_create(request,
+                                                    request_writing_done,
+                                                    request_writing_finished,
+                                                    conn->allocator);
+            serf_bucket_aggregate_append(ostreamt, event_bucket);
         }
 
         /* If we got some data, then deliver it. */
@@ -1010,7 +1041,6 @@ static apr_status_t write_to_connection(
             return status;
 
         if (request && conn->hit_eof && conn->vec_len == 0) {
-            serf_bucket_t *trk_bkt;
             /* If we hit the end of the request bucket and all of its data has
              * been written, then clear it out to signify that we're done
              * sending the request. On the next iteration through this loop:
@@ -1019,16 +1049,6 @@ static apr_status_t write_to_connection(
              * - we'll see if there are other requests that need to be sent 
              * ("pipelining").
              */
-            request->writing = SERF_WRITING_DONE;
-
-            /* We don't know when the request writing is finished, but we know
-               how to track that... Let's introduce a callback that is called
-               when we write again */
-            /* ### More efficient to use other bucket type? */
-            trk_bkt = serf_bucket_aggregate_create(conn->allocator);
-            serf_bucket_aggregate_hold_open(trk_bkt, request_writing_finished,
-                                            request);
-            serf_bucket_aggregate_prepend(ostreamt, trk_bkt);
 
             /* Move the request to the written queue */
             serf__link_requests(&conn->written_reqs, &conn->written_reqs_tail,

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1713489&r1=1713488&r2=1713489&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Mon Nov  9 17:49:59 2015
@@ -636,4 +636,20 @@ void serf__log(apr_uint32_t level, apr_u
 int serf__log_enabled(apr_uint32_t level, apr_uint32_t comp,
                       serf_config_t *config);
 
+
+/* Event bucket */
+
+extern const serf_bucket_type_t serf_bucket_type__event;
+#define SERF__BUCKET_IS_EVENT(b) SERF_BUCKET_CHECK((b), _event)
+
+typedef apr_status_t(*serf_bucket_event_callback_t)(void *baton);
+
+serf_bucket_t *serf__bucket_event_create(
+                        void *baton,
+                        serf_bucket_event_callback_t eof_cb,
+                        serf_bucket_event_callback_t destroy_cb,
+                        serf_bucket_alloc_t *allocator);
+
+
+
 #endif


Reply via email to