Author: rhuijben
Date: Wed Nov 18 16:56:09 2015
New Revision: 1715022

URL: http://svn.apache.org/viewvc?rev=1715022&view=rev
Log:
Start using the bucket pump for outgoing connections. This switches some of
the easy code over to using the pump, while other parts now use the pump
state directly.

* incoming.c
  (client_connected): Update caller.

* outgoing.c
  (data_pending): Remove function.
  (request_or_data_pending): Update caller.
  (serf__conn_update_pollset): Use serf_pump__data_pending().
  (serf__connection_pre_cleanup): Use the pump for cleaning.
  (do_conn_setup): Use new helper functions.
  (prepare_conn_streams): Update usage.
  (store_ipaddresses_in_config): Remove function.
  (connect_connection): Update caller.
  (serf__open_connections): Initialize pump.
  (no_more_writes): Update user.
  (reset_connection): Update caller.
  (socket_writev): Switch argument type.
  (serf__connection_flush): Use pump vars.
  (handle_async_response,
   read_from_connection): Update caller.
  (serf_connection_create): Simplify init.

* protocols/fcgi_protocol.c
  (fcgi_outgoing_read,
   serf__fcgi_protocol_init): Update init.

* protocols/http2_protocol.c
  (serf__http2_protocol_init,
   http2_outgoing_read): Update init.

* pump.c
  (serf_pump__init): Init pool.
  (serf_pump__done): New function.
  (serf_pump__complete_setup): Add stream argument.

* serf_private.h
  (serf_pump_t): Fix name. Add pool.
  (serf_connection_t): Add pump. Remove old vars.
  (serf_pump__done): New function.
  (serf_pump__complete_setup): New function.

* ssltunnel.c
  (handle_response,
   detect_eof,
   serf__ssltunnel_connect): Update usages.

Modified:
    serf/trunk/incoming.c
    serf/trunk/outgoing.c
    serf/trunk/protocols/fcgi_protocol.c
    serf/trunk/protocols/http2_protocol.c
    serf/trunk/pump.c
    serf/trunk/serf_private.h
    serf/trunk/ssltunnel.c

Modified: serf/trunk/incoming.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/incoming.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/incoming.c (original)
+++ serf/trunk/incoming.c Wed Nov 18 16:56:09 2015
@@ -32,6 +32,7 @@ static apr_status_t client_connected(ser
 {
     /* serf_context_t *ctx = client->ctx; */
     apr_status_t status;
+    serf_bucket_t *stream;
     serf_bucket_t *ostream;
 
     serf_pump__store_ipaddresses_in_config(&client->pump);
@@ -46,17 +47,17 @@ static apr_status_t client_connected(ser
     ostream = client->pump.ostream_tail;
 
     status = client->setup(client->skt,
-                           &client->pump.stream,
+                           &stream,
                            &ostream,
                            client->setup_baton, client->pool);
 
     if (status) {
-        serf_pump__complete_setup(&client->pump, NULL);
+        serf_pump__complete_setup(&client->pump, NULL, NULL);
         /* ### Cleanup! (serf__connection_pre_cleanup) */
         return status;
     }
 
-    serf_pump__complete_setup(&client->pump, ostream);
+    serf_pump__complete_setup(&client->pump, stream, ostream);
 
     if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) {
         client->proto_peek_bkt = serf_bucket_aggregate_create(

Modified: serf/trunk/outgoing.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/outgoing.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/outgoing.c (original)
+++ serf/trunk/outgoing.c Wed Nov 18 16:56:09 2015
@@ -67,36 +67,6 @@ static apr_status_t clean_conn(void *dat
     return APR_SUCCESS;
 }
 
-/* Safely check if there is still data pending on the connection, carefull
-   to not accidentally make it invalid. */
-static int
-data_pending(serf_connection_t *conn)
-{
-    if (conn->vec_len > 0)
-        return TRUE; /* We can't poll right now! */
-
-    if (conn->ostream_head) {
-        const char *dummy;
-        apr_size_t len;
-        apr_status_t status;
-
-        status = serf_bucket_peek(conn->ostream_head, &dummy,
-                                  &len);
-        if (!SERF_BUCKET_READ_ERROR(status)) {
-            if (len > 0) {
-                serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
-                          "Extra data to be written after sending complete "
-                          "requests.\n");
-                return TRUE;
-            }
-        }
-        else
-            return TRUE; /* Sure, we have data (an error) */
-    }
-
-    return FALSE;
-}
-
 static int
 request_pending(serf_request_t **next_req, serf_connection_t *conn)
 {
@@ -136,7 +106,7 @@ request_or_data_pending(serf_request_t *
     if (request_pending(next_req, conn))
         return TRUE;
 
-    return data_pending(conn);
+    return serf_pump__data_pending(&conn->pump);
 }
 
 /* Update the pollset for this connection. We tweak the pollset based on
@@ -181,41 +151,8 @@ apr_status_t serf__conn_update_pollset(s
            But it also has the nice side effect of removing references
            from the aggregate to requests that are done.
          */
-        if (conn->vec_len) {
-            /* We still have vecs in the connection, which lifetime is
-               managed by buckets inside conn->ostream_head.
-
-               Don't touch ostream as that might destroy the vecs */
-
-            data_waiting = (conn->state != SERF_CONN_CLOSING);
-        }
-        else {
-            serf_bucket_t *ostream;
-
-            ostream = conn->ostream_head;
-
-            if (!ostream)
-              ostream = conn->ssltunnel_ostream;
-
-            if (ostream) {
-                const char *dummy_data;
-                apr_size_t len;
-
-                status = serf_bucket_peek(ostream, &dummy_data, &len);
-
-                if (SERF_BUCKET_READ_ERROR(status) || len > 0) {
-                    /* DATA or error waiting */
-                    data_waiting = TRUE; /* Error waiting */
-                }
-                else if (! status || APR_STATUS_IS_EOF(status)) {
-                    data_waiting = FALSE;
-                }
-                else
-                    data_waiting = FALSE; /* EAGAIN / EOF / WAIT_CONN */
-            }
-            else
-                data_waiting = FALSE;
-        }
+        data_waiting = serf_pump__data_pending(&conn->pump)
+                       && (conn->state != SERF_CONN_CLOSING);
 
         if (data_waiting) {
             desc.reqevents |= APR_POLLOUT;
@@ -229,7 +166,7 @@ apr_status_t serf__conn_update_pollset(s
         desc.reqevents |= APR_POLLIN;
 
         /* Don't write if OpenSSL told us that it needs to read data first. */
-        if (! conn->stop_writing && !data_waiting) {
+        if (! conn->pump.stop_writing && !data_waiting) {
 
             /* This check is duplicated in write_to_connection() */
             if ((conn->probable_keepalive_limit &&
@@ -288,16 +225,10 @@ static void check_buckets_drained(serf_c
 void serf__connection_pre_cleanup(serf_connection_t *conn)
 {
     serf_request_t *rq;
-    conn->vec_len = 0;
+    conn->pump.vec_len = 0;
+
+    serf_pump__done(&conn->pump);
 
-    if (conn->ostream_head != NULL) {
-#ifdef SERF_DEBUG_BUCKET_USE
-        serf__bucket_drain(conn->ostream_head);
-#endif
-        serf_bucket_destroy(conn->ostream_head);
-        conn->ostream_head = NULL;
-        conn->ostream_tail = NULL;
-    }
     if (conn->ssltunnel_ostream != NULL) {
         serf_bucket_destroy(conn->ssltunnel_ostream);
         conn->ssltunnel_ostream = NULL;
@@ -324,58 +255,32 @@ void serf__connection_pre_cleanup(serf_c
     conn->done_reqs = conn->done_reqs_tail = NULL;
 }
 
-static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
-{
-    serf_connection_t *conn = baton;
-    conn->hit_eof = 1;
-    return APR_EAGAIN;
-}
-
 static apr_status_t do_conn_setup(serf_connection_t *conn)
 {
     apr_status_t status;
-    serf_bucket_t *ostream;
+    serf_bucket_t *ostream, *stream;
 
     /* ### dunno what the hell this is about. this latency stuff got
        ### added, and who knows whether it should stay...  */
     conn->latency = apr_time_now() - conn->connect_time;
 
-    if (conn->ostream_head == NULL) {
-        conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
-    }
+    serf_pump__prepare_setup(&conn->pump);
 
-    if (conn->ostream_tail == NULL) {
-        conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator);
-
-        serf_bucket_aggregate_hold_open(conn->ostream_tail, detect_eof, conn);
-    }
-
-    ostream = conn->ostream_tail;
+    ostream = conn->pump.ostream_tail;
 
     status = (*conn->setup)(conn->skt,
-                            &conn->stream,
+                            &stream,
                             &ostream,
                             conn->setup_baton,
                             conn->pool);
     if (status) {
         /* extra destroy here since it wasn't added to the head bucket yet. */
-        serf_bucket_destroy(conn->ostream_tail);
+        serf_pump__complete_setup(&conn->pump, NULL, NULL);
         serf__connection_pre_cleanup(conn);
         return status;
     }
 
-    /* Share the configuration with all the buckets in the newly created output
-     chain (see PLAIN or ENCRYPTED scenario's), including the request buckets
-     created by the application (ostream_tail will handle this for us). */
-    serf_bucket_set_config(conn->ostream_head, conn->config);
-
-    /* Share the configuration with the ssl_decrypt and socket buckets. The
-     response buckets wrapping the ssl_decrypt/socket buckets won't get the
-     config automatically because they are upstream. */
-    serf_bucket_set_config(conn->stream, conn->config);
-
-    serf_bucket_aggregate_append(conn->ostream_head,
-                                 ostream);
+    serf_pump__complete_setup(&conn->pump, stream, ostream);
 
     /* We typically have one of two scenarios, based on whether the
        application decided to encrypt this connection:
@@ -419,22 +324,23 @@ static apr_status_t prepare_conn_streams
         /* If the connection does not have an associated bucket, then
          * call the setup callback to get one.
          */
-        if (conn->stream == NULL) {
+        if (conn->pump.stream == NULL) {
             status = do_conn_setup(conn);
             if (status) {
                 return status;
             }
         }
-        *ostreamt = conn->ostream_tail;
-        *ostreamh = conn->ostream_head;
+        *ostreamt = conn->pump.ostream_tail;
+        *ostreamh = conn->pump.ostream_head;
     } else if (conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
 
         /* SSL tunnel needed and not set up yet, get a direct unencrypted
          stream for this socket */
-        if (conn->stream == NULL) {
-            conn->stream = serf_context_bucket_socket_create(conn->ctx,
-                                                             conn->skt,
-                                                             conn->allocator);
+        if (conn->pump.stream == NULL) {
+            conn->pump.stream =
+                serf_context_bucket_socket_create(conn->ctx,
+                                                  conn->skt,
+                                                  conn->allocator);
         }
 
         /* Don't create the ostream bucket chain including the ssl_encrypt
@@ -444,38 +350,19 @@ static apr_status_t prepare_conn_streams
     } else {
         /* SERF_CONN_CLOSING or SERF_CONN_INIT */
 
-        *ostreamt = conn->ostream_tail;
-        *ostreamh = conn->ostream_head;
+        *ostreamt = conn->pump.ostream_tail;
+        *ostreamh = conn->pump.ostream_head;
     }
 
     return APR_SUCCESS;
 }
 
-static void store_ipaddresses_in_config(serf_config_t *config,
-                                        apr_socket_t *skt)
-{
-     apr_sockaddr_t *sa;
-
-    if (apr_socket_addr_get(&sa, APR_LOCAL, skt) == APR_SUCCESS) {
-        char buf[48];
-        if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
-            serf_config_set_stringf(config, SERF_CONFIG_CONN_LOCALIP,
-                                    "%s:%d", buf, sa->port);
-    }
-    if (apr_socket_addr_get(&sa, APR_REMOTE, skt) == APR_SUCCESS) {
-        char buf[48];
-        if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
-            serf_config_set_stringf(config, SERF_CONFIG_CONN_REMOTEIP,
-                                    "%s:%d", buf, sa->port);
-    }
-}
-
 static apr_status_t connect_connection(serf_connection_t *conn)
 {
     serf_context_t *ctx = conn->ctx;
     apr_status_t status;
 
-    store_ipaddresses_in_config(conn->config, conn->skt);
+    serf_pump__store_ipaddresses_in_config(&conn->pump);
 
     serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
               "socket for conn 0x%x connected\n", conn);
@@ -581,6 +468,9 @@ apr_status_t serf__open_connections(serf
         if (status)
             return status;
 
+        serf_pump__init(&conn->pump, &conn->io, skt, conn->config,
+                        conn->allocator, conn->pool);
+
         /* Flag our pollset as dirty now that we have a new socket. */
         serf_io__set_pollset_dirty(&conn->io);
 
@@ -603,7 +493,7 @@ static apr_status_t no_more_writes(serf_
               "stop writing on conn 0x%x\n", conn);
 
     /* Clear our iovec. */
-    conn->vec_len = 0;
+    conn->pump.vec_len = 0;
 
     /* Update the pollset to know we don't want to write on this socket any
      * more.
@@ -717,22 +607,22 @@ static apr_status_t reset_connection(ser
         }
     }
 
-    if (conn->stream != NULL) {
-        serf_bucket_destroy(conn->stream);
-        conn->stream = NULL;
+    if (conn->pump.stream != NULL) {
+        serf_bucket_destroy(conn->pump.stream);
+        conn->pump.stream = NULL;
     }
 
     /* Don't try to resume any writes */
-    conn->vec_len = 0;
+    conn->pump.vec_len = 0;
 
     serf_io__set_pollset_dirty(&conn->io);
     conn->state = SERF_CONN_INIT;
 
-    conn->hit_eof = 0;
+    conn->pump.hit_eof = 0;
     conn->connect_time = 0;
     conn->latency = -1;
-    conn->stop_writing = 0;
-    conn->write_now = 0;
+    conn->pump.stop_writing = false;
+    conn->write_now = false;
     /* conn->pipelining */
 
     conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1;
@@ -760,7 +650,7 @@ static apr_status_t reset_connection(ser
     return APR_SUCCESS;
 }
 
-static apr_status_t socket_writev(serf_connection_t *conn)
+static apr_status_t socket_writev(serf_pump_t *conn)
 {
     apr_size_t written;
     apr_status_t status;
@@ -805,7 +695,7 @@ static apr_status_t socket_writev(serf_c
         serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n");
 
         /* Log progress information */
-        serf__context_progress_delta(conn->ctx, 0, written);
+        serf__context_progress_delta(conn->io->ctx, 0, written);
     }
 
     return status;
@@ -818,14 +708,14 @@ apr_status_t serf__connection_flush(serf
     apr_status_t read_status = APR_SUCCESS;
     serf_bucket_t *ostreamh = NULL;
 
-    conn->hit_eof = FALSE;
+    conn->pump.hit_eof = FALSE;
 
     while (status == APR_SUCCESS) {
 
         /* First try to write out what is already stored in the
            connection vecs. */
-        while (conn->vec_len && !status) {
-            status = socket_writev(conn);
+        while (conn->pump.vec_len && !status) {
+            status = socket_writev(&conn->pump);
 
             /* If the write would have blocked, then we're done.
              * Don't try to write anything else to the socket.
@@ -838,7 +728,7 @@ apr_status_t serf__connection_flush(serf
 
         if (status || !pump)
             return status;
-        else if (read_status || conn->vec_len || conn->hit_eof)
+        else if (read_status || conn->pump.vec_len || conn->pump.hit_eof)
             return read_status;
 
         /* Ok, with the vecs written, we can now refill the per connection
@@ -856,12 +746,12 @@ apr_status_t serf__connection_flush(serf
            data as available, we probably don't want to read ALL_AVAIL, but
            a lower number, like the size of one or a few TCP packets, the
            available TCP buffer size ... */
-        conn->hit_eof = 0;
+        conn->pump.hit_eof = 0;
         read_status = serf_bucket_read_iovec(ostreamh,
                                              SERF_READ_ALL_AVAIL,
                                              IOV_MAX,
-                                             conn->vec,
-                                             &conn->vec_len);
+                                             conn->pump.vec,
+                                             &conn->pump.vec_len);
 
         if (read_status == SERF_ERROR_WAIT_CONN) {
             /* The bucket told us that it can't provide more data until
@@ -873,7 +763,7 @@ apr_status_t serf__connection_flush(serf
             we can actually write something. otherwise, we could
             end up in a CPU spin: socket wants something, but we
             don't have anything (and keep returning EAGAIN) */
-            conn->stop_writing = 1;
+            conn->pump.stop_writing = 1;
             serf_io__set_pollset_dirty(&conn->io);
 
             read_status = APR_EAGAIN;
@@ -881,7 +771,7 @@ apr_status_t serf__connection_flush(serf
         else if (APR_STATUS_IS_EAGAIN(read_status)) {
 
             /* We read some stuff, but did we read everything ? */
-            if (conn->hit_eof)
+            if (conn->pump.hit_eof)
                 read_status = APR_SUCCESS;
         }
         else if (SERF_BUCKET_READ_ERROR(read_status)) {
@@ -1087,7 +977,7 @@ static apr_status_t handle_async_respons
 
     if (conn->current_async_response == NULL) {
         conn->current_async_response =
-            (*conn->async_acceptor)(NULL, conn->stream,
+            (*conn->async_acceptor)(NULL, conn->pump.stream,
                                     conn->async_acceptor_baton, pool);
     }
 
@@ -1121,8 +1011,8 @@ static apr_status_t read_from_connection
 
     /* If the stop_writing flag was set on the connection, reset it now because
        there is some data to read. */
-    if (conn->stop_writing) {
-        conn->stop_writing = 0;
+    if (conn->pump.stop_writing) {
+        conn->pump.stop_writing = false;
         serf_io__set_pollset_dirty(&conn->io);
     }
 
@@ -1182,7 +1072,7 @@ static apr_status_t read_from_connection
             const char *data;
             apr_size_t len;
 
-            status = serf_bucket_peek(conn->stream, &data, &len);
+            status = serf_bucket_peek(conn->pump.stream, &data, &len);
 
             if (APR_STATUS_IS_EOF(status)) {
                 reset_connection(conn, 1);
@@ -1199,7 +1089,7 @@ static apr_status_t read_from_connection
 
             /* Unexpected response from the server */
             if (conn->write_now) {
-                conn->write_now = 0;
+                conn->write_now = false;
                 status = conn->perform_write(conn);
 
                 if (!SERF_BUCKET_READ_ERROR(status))
@@ -1220,7 +1110,7 @@ static apr_status_t read_from_connection
               return SERF_ERROR_BAD_HTTP_RESPONSE;
             }
 
-            request->resp_bkt = (*request->acceptor)(request, conn->stream,
+            request->resp_bkt = (*request->acceptor)(request, 
conn->pump.stream,
                                                      request->acceptor_baton,
                                                      tmppool);
             apr_pool_clear(tmppool);
@@ -1554,19 +1444,14 @@ serf_connection_t *serf_connection_creat
     conn->closed_baton = closed_baton;
     conn->pool = pool;
     conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
-    conn->stream = NULL;
-    conn->ostream_head = NULL;
-    conn->ostream_tail = NULL;
     conn->io.type = SERF_IO_CONN;
     conn->io.u.conn = conn;
     conn->io.ctx = ctx;
     conn->io.dirty_conn = false;
     conn->io.reqevents = 0;
-    conn->hit_eof = 0;
     conn->state = SERF_CONN_INIT;
     conn->latency = -1; /* unknown */
-    conn->stop_writing = 0;
-    conn->write_now = 0;
+    conn->write_now = false;
     conn->wait_for_connect = 0;
     conn->pipelining = 1;
     conn->framing_type = SERF_CONNECTION_FRAMING_TYPE_HTTP1;
@@ -1698,9 +1583,9 @@ apr_status_t serf_connection_close(
                     handle_conn_closed(conn, status);
                 }
             }
-            if (conn->stream != NULL) {
-                serf_bucket_destroy(conn->stream);
-                conn->stream = NULL;
+            if (conn->pump.stream != NULL) {
+                serf_bucket_destroy(conn->pump.stream);
+                conn->pump.stream = NULL;
             }
 
             if (conn->protocol_baton) {
@@ -1782,8 +1667,8 @@ void serf_connection_set_framing_type(
 
     if (conn->skt) {
         serf_io__set_pollset_dirty(&conn->io);
-        conn->stop_writing = 0;
-        conn->write_now = 1;
+        conn->pump.stop_writing = 0;
+        conn->write_now = true;
 
         /* Close down existing protocol */
         if (conn->protocol_baton) {

Modified: serf/trunk/protocols/fcgi_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/fcgi_protocol.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/protocols/fcgi_protocol.c (original)
+++ serf/trunk/protocols/fcgi_protocol.c Wed Nov 18 16:56:09 2015
@@ -500,7 +500,7 @@ static apr_status_t fcgi_outgoing_read(s
     serf_fcgi_protocol_t *fcgi = conn->protocol_baton;
 
     if (!fcgi->stream)
-        fcgi->stream = conn->stream;
+        fcgi->stream = conn->pump.stream;
 
     return fcgi_read(fcgi);
 }
@@ -537,8 +537,8 @@ void serf__fcgi_protocol_init(serf_conne
     fcgi->pool = protocol_pool;
     fcgi->conn = conn;
     fcgi->io = &conn->io;
-    fcgi->stream = conn->stream;
-    fcgi->ostream = conn->ostream_tail;
+    fcgi->stream = conn->pump.stream;
+    fcgi->ostream = conn->pump.ostream_tail;
     fcgi->allocator = conn->allocator;
     fcgi->config = conn->config;
 

Modified: serf/trunk/protocols/http2_protocol.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/protocols/http2_protocol.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/protocols/http2_protocol.c (original)
+++ serf/trunk/protocols/http2_protocol.c Wed Nov 18 16:56:09 2015
@@ -239,8 +239,8 @@ void serf__http2_protocol_init(serf_conn
     h2->pool = protocol_pool;
     h2->conn = conn;
     h2->io = &conn->io;
-    h2->stream = conn->stream;
-    h2->ostream = conn->ostream_tail;
+    h2->stream = conn->pump.stream;
+    h2->ostream = conn->pump.ostream_tail;
     h2->allocator = conn->allocator;
     h2->config = conn->config;
 
@@ -1644,14 +1644,14 @@ http2_outgoing_read(serf_connection_t *c
 
     /* If the stop_writing flag was set on the connection, reset it now because
        there is some data to read. */
-    if (conn->stop_writing)
+    if (conn->pump.stop_writing)
     {
-        conn->stop_writing = 0;
+        conn->pump.stop_writing = false;
         serf_io__set_pollset_dirty(&conn->io);
     }
 
     if (h2->stream == NULL)
-        h2->stream = conn->stream;
+        h2->stream = conn->pump.stream;
 
     status = http2_process(h2);
 

Modified: serf/trunk/pump.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/pump.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/pump.c (original)
+++ serf/trunk/pump.c Wed Nov 18 16:56:09 2015
@@ -58,11 +58,37 @@ void serf_pump__init(serf_pump_t *pump,
     pump->allocator = allocator;
     pump->config = config;
     pump->skt = skt;
+    pump->pool = pool;
 
     apr_pool_cleanup_register(pool, pump, pump_cleanup,
                               apr_pool_cleanup_null);
 }
 
+void serf_pump__done(serf_pump_t *pump)
+{
+    if (pump->pool) {
+        apr_pool_cleanup_run(pump->pool, pump, pump_cleanup);
+    }
+
+    pump->io = NULL;
+    pump->allocator = NULL;
+    pump->config = NULL;
+
+    /* pump->stream is managed by the current reader! */
+
+    pump->ostream_head = NULL;
+    pump->ostream_tail = NULL;
+
+    pump->skt = NULL;
+    pump->vec_len = 0;
+
+    pump->done_writing = false;
+    pump->stop_writing = false;
+    pump->hit_eof = false;
+
+    pump->pool = NULL;
+}
+
 /* Safely check if there is still data pending on the connection, carefull
    to not accidentally make it invalid. */
 bool serf_pump__data_pending(serf_pump_t *pump)
@@ -118,8 +144,10 @@ void serf_pump__prepare_setup(serf_pump_
 }
 
 void serf_pump__complete_setup(serf_pump_t *pump,
+                               serf_bucket_t *stream,
                                serf_bucket_t *ostream)
 {
+    pump->stream = stream;
     if (ostream)
         serf_bucket_aggregate_append(pump->ostream_head, ostream);
     else

Modified: serf/trunk/serf_private.h
URL: 
http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/serf_private.h (original)
+++ serf/trunk/serf_private.h Wed Nov 18 16:56:09 2015
@@ -147,14 +147,18 @@ typedef struct serf_io_baton_t {
 
 } serf_io_baton_t;
 
-typedef struct serf_pump_io_t
+typedef struct serf_pump_t
 {
     serf_io_baton_t *io;
 
     serf_bucket_alloc_t *allocator;
     serf_config_t *config;
 
+    /* The incoming stream. Stored here for easy access by users,
+       but not managed as part of the pump */
     serf_bucket_t *stream;
+
+    /* The outgoing stream */
     serf_bucket_t *ostream_head;
     serf_bucket_t *ostream_tail;
 
@@ -171,6 +175,8 @@ typedef struct serf_pump_io_t
 
     /* Set to true when ostream_tail was read to EOF */
     bool hit_eof;
+
+    apr_pool_t *pool;
 } serf_pump_t;
 
 
@@ -442,7 +448,6 @@ struct serf_incoming_t {
     void(*perform_teardown)(serf_incoming_t *conn);
     void *protocol_baton;
 
-
     serf_config_t *config;
 
     serf_bucket_t *proto_peek_bkt;
@@ -464,6 +469,7 @@ struct serf_connection_t {
 
     apr_status_t status;
     serf_io_baton_t io;
+    serf_pump_t pump;
 
     apr_pool_t *pool;
     serf_bucket_alloc_t *allocator;
@@ -496,14 +502,6 @@ struct serf_connection_t {
     serf_response_handler_t async_handler;
     void *async_handler_baton;
 
-    /* A bucket wrapped around our socket (for reading responses). */
-    serf_bucket_t *stream;
-    /* A reference to the aggregate bucket that provides the boundary between
-     * request level buckets and connection level buckets.
-     */
-    serf_bucket_t *ostream_head;
-    serf_bucket_t *ostream_tail;
-
     /* Aggregate bucket used to send the CONNECT request. */
     serf_bucket_t *ssltunnel_ostream;
 
@@ -524,9 +522,6 @@ struct serf_connection_t {
     serf_request_t *done_reqs;
     serf_request_t *done_reqs_tail;
 
-    struct iovec vec[IOV_MAX];
-    int vec_len;
-
     serf_connection_setup_t setup;
     void *setup_baton;
     serf_connection_closed_t closed;
@@ -542,8 +537,6 @@ struct serf_connection_t {
        only. */
     int pipelining;
 
-    int hit_eof;
-
     /* Host url, path ommitted, syntax: https://svn.apache.org . */
     const char *host_url;
 
@@ -560,11 +553,8 @@ struct serf_connection_t {
     /* Calculated connection latency. Negative value if latency is unknown. */
     apr_interval_time_t latency;
 
-    /* Needs to read first before we can write again. */
-    int stop_writing;
-
     /* Write out information now */
-    int write_now;
+    bool write_now;
 
     /* Wait for connect: connect() returned APR_EINPROGRESS.
        Socket not usable yet */
@@ -769,6 +759,8 @@ void serf_pump__init(serf_pump_t *pump,
                      serf_bucket_alloc_t *allocator,
                      apr_pool_t *pool);
 
+void serf_pump__done(serf_pump_t *pump);
+
 bool serf_pump__data_pending(serf_pump_t *pump);
 void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump);
 
@@ -777,7 +769,9 @@ apr_status_t serf_pump__write(serf_pump_
 
 /* These must always be called as a pair to avoid a memory leak */
 void serf_pump__prepare_setup(serf_pump_t *pump);
-void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream);
+void serf_pump__complete_setup(serf_pump_t *pump,
+                               serf_bucket_t *stream,
+                               serf_bucket_t *ostream);
 
 
 /** Logging functions. **/

Modified: serf/trunk/ssltunnel.c
URL: 
http://svn.apache.org/viewvc/serf/trunk/ssltunnel.c?rev=1715022&r1=1715021&r2=1715022&view=diff
==============================================================================
--- serf/trunk/ssltunnel.c (original)
+++ serf/trunk/ssltunnel.c Wed Nov 18 16:56:09 2015
@@ -111,8 +111,8 @@ static apr_status_t handle_response(serf
         apr_pool_destroy(ctx->pool);
         serf_bucket_destroy(conn->ssltunnel_ostream);
         conn->ssltunnel_ostream = NULL;
-        serf_bucket_destroy(conn->stream);
-        conn->stream = NULL;
+        serf_bucket_destroy(conn->pump.stream);
+        conn->pump.stream = NULL;
         ctx = NULL;
 
         serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, conn->config,
@@ -173,7 +173,7 @@ static apr_status_t setup_request(serf_r
 static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
 {
     serf_connection_t *conn = baton;
-    conn->hit_eof = 1;
+    conn->pump.hit_eof = true;
     return APR_EAGAIN;
 }
 
@@ -182,6 +182,7 @@ apr_status_t serf__ssltunnel_connect(ser
 {
     req_ctx_t *ctx;
     apr_pool_t *ssltunnel_pool;
+    serf_bucket_t *ssltunnel_ostream;
 
     apr_pool_create(&ssltunnel_pool, conn->pool);
 
@@ -190,8 +191,10 @@ apr_status_t serf__ssltunnel_connect(ser
     ctx->uri = apr_psprintf(ctx->pool, "%s:%d", conn->host_info.hostname,
                             conn->host_info.port);
 
-    conn->ssltunnel_ostream = serf_bucket_aggregate_create(conn->allocator);
-    serf_bucket_aggregate_hold_open(conn->ssltunnel_ostream, detect_eof, conn);
+    ssltunnel_ostream = serf_bucket_aggregate_create(conn->allocator);
+    serf_bucket_aggregate_hold_open(ssltunnel_ostream, detect_eof, conn);
+
+    conn->ssltunnel_ostream = ssltunnel_ostream;
 
     serf__ssltunnel_request_create(conn,
                                    setup_request,


Reply via email to