Author: rhuijben
Date: Mon Nov 23 16:41:12 2015
New Revision: 1715871
URL: http://svn.apache.org/viewvc?rev=1715871&view=rev
Log:
Backport recent pump fixes to the pump-investigate branch to allow testing
them on OS/X.
* buckets/ssl_buckets.c
* incoming.c
* pump.c
* serf_private.h
* test/serf_httpd.c
* test/test_server.c
Merge some recent work.
Modified:
serf/branches/pump-investigate/buckets/ssl_buckets.c
serf/branches/pump-investigate/incoming.c
serf/branches/pump-investigate/pump.c
serf/branches/pump-investigate/serf_private.h
serf/branches/pump-investigate/test/test_server.c
Modified: serf/branches/pump-investigate/buckets/ssl_buckets.c
URL:
http://svn.apache.org/viewvc/serf/branches/pump-investigate/buckets/ssl_buckets.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/buckets/ssl_buckets.c (original)
+++ serf/branches/pump-investigate/buckets/ssl_buckets.c Mon Nov 23 16:41:12
2015
@@ -177,7 +177,7 @@ struct serf_ssl_context_t {
const char *selected_protocol; /* Cached protocol value once available */
/* Protocol callback */
- serf_ssl_protocol_result_cb_t protocol_callback;
+ serf_ssl_protocol_result_cb_t protocol_callback;
void *protocol_userdata;
serf_config_t *config;
@@ -255,7 +255,7 @@ apps_ssl_info_callback(const SSL *s, int
if (ret > 0) {
/* ret > 0: Just a state change; not an error */
serf__log(level, LOGCOMP_SSL, __FILE__, ctx->config,
- "%s: %s\n",
+ "%s: %s (%d)\n",
str, SSL_state_string_long(s),
ctx->crypt_status);
}
@@ -282,7 +282,7 @@ apps_ssl_info_callback(const SSL *s, int
#endif
-/* Listens for the SSL renegotiate ciphers alert and report it back to the
+/* Listens for the SSL renegotiate ciphers alert and report it back to the
serf context. */
static void
detect_renegotiate(const SSL *s, int where, int ret)
@@ -647,7 +647,7 @@ get_subject_alt_names(apr_array_header_t
}
sk_GENERAL_NAME_pop_free(names, GENERAL_NAME_free);
}
-
+
return APR_SUCCESS;
}
@@ -698,7 +698,7 @@ validate_server_certificate(int cert_val
int err = X509_STORE_CTX_get_error(store_ctx);
switch(err) {
- case X509_V_ERR_CERT_NOT_YET_VALID:
+ case X509_V_ERR_CERT_NOT_YET_VALID:
failures |= SERF_SSL_CERT_NOTYETVALID;
break;
case X509_V_ERR_CERT_HAS_EXPIRED:
@@ -840,7 +840,7 @@ validate_server_certificate(int cert_val
{
ctx->pending_err = SERF_ERROR_SSL_CERT_FAILED;
}
-
+
return cert_valid;
}
@@ -1005,7 +1005,7 @@ static apr_status_t ssl_decrypt(void *ba
serf__log(LOGLVL_DEBUG, LOGCOMP_SSLMSG, __FILE__, ctx->config,
"---\n%.*s\n-(%d)-\n", *len, buf, *len);
}
-
+
if (!ctx->handshake_finished
&& !SERF_BUCKET_READ_ERROR(status)) {
@@ -1166,7 +1166,7 @@ static apr_status_t ssl_encrypt(void *ba
serf__log(LOGLVL_DEBUG, LOGCOMP_SSL, __FILE__, ctx->config,
"---\n%.*s\n-(%d)-\n",
interim_len, vecs_data, interim_len);
-
+
}
}
}
@@ -1352,10 +1352,10 @@ static void init_ssl_libraries(void)
thread has completed */
while (val != INIT_DONE) {
apr_sleep(APR_USEC_PER_SEC / 1000);
-
+
val = apr_atomic_cas32(&have_init_ssl,
INIT_UNINITIALIZED,
- INIT_UNINITIALIZED);
+ INIT_UNINITIALIZED);
}
}
}
@@ -2196,7 +2196,7 @@ const char *serf_ssl_cert_export(
encoded_cert = apr_palloc(pool, apr_base64_encode_len(len));
apr_base64_encode(encoded_cert, binary_cert, len);
-
+
return encoded_cert;
}
Modified: serf/branches/pump-investigate/incoming.c
URL:
http://svn.apache.org/viewvc/serf/branches/pump-investigate/incoming.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/incoming.c (original)
+++ serf/branches/pump-investigate/incoming.c Mon Nov 23 16:41:12 2015
@@ -32,12 +32,13 @@ static apr_status_t client_connected(ser
{
/* serf_context_t *ctx = client->ctx; */
apr_status_t status;
+ serf_bucket_t *stream;
serf_bucket_t *ostream;
serf_pump__store_ipaddresses_in_config(&client->pump);
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, client->config,
- "socket for client 0x%x connected\n", client);
+ "socket for client 0x%p connected\n", client);
/* ### Connection does auth setup here */
@@ -46,17 +47,17 @@ static apr_status_t client_connected(ser
ostream = client->pump.ostream_tail;
status = client->setup(client->skt,
- &client->pump.stream,
+ &stream,
&ostream,
client->setup_baton, client->pool);
if (status) {
- serf_pump__complete_setup(&client->pump, NULL);
+ serf_pump__complete_setup(&client->pump, NULL, NULL);
/* ### Cleanup! (serf__connection_pre_cleanup) */
return status;
}
- serf_pump__complete_setup(&client->pump, ostream);
+ serf_pump__complete_setup(&client->pump, stream, ostream);
if (client->framing_type == SERF_CONNECTION_FRAMING_TYPE_NONE) {
client->proto_peek_bkt = serf_bucket_aggregate_create(
Modified: serf/branches/pump-investigate/pump.c
URL:
http://svn.apache.org/viewvc/serf/branches/pump-investigate/pump.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/pump.c (original)
+++ serf/branches/pump-investigate/pump.c Mon Nov 23 16:41:12 2015
@@ -29,7 +29,7 @@
#include "serf_private.h"
-apr_status_t pump_cleanup(void *baton)
+static apr_status_t pump_cleanup(void *baton)
{
serf_pump_t *pump = baton;
@@ -42,6 +42,11 @@ apr_status_t pump_cleanup(void *baton)
pump->ostream_tail = NULL;
}
+ pump->pool = NULL; /* Don't run again */
+ pump->allocator = NULL;
+ pump->skt = NULL;
+ pump->vec_len = 0;
+
return APR_SUCCESS;
}
@@ -58,11 +63,34 @@ void serf_pump__init(serf_pump_t *pump,
pump->allocator = allocator;
pump->config = config;
pump->skt = skt;
+ pump->pool = pool;
apr_pool_cleanup_register(pool, pump, pump_cleanup,
apr_pool_cleanup_null);
}
+void serf_pump__done(serf_pump_t *pump)
+{
+ if (pump->pool) {
+ apr_pool_cleanup_run(pump->pool, pump, pump_cleanup);
+ }
+
+ pump->io = NULL;
+ pump->allocator = NULL;
+ pump->config = NULL;
+
+ /* pump->stream is managed by the current reader! */
+
+ pump->ostream_head = NULL;
+ pump->ostream_tail = NULL;
+
+ pump->done_writing = false;
+ pump->stop_writing = false;
+ pump->hit_eof = false;
+
+ pump->pool = NULL;
+}
+
/* Safely check if there is still data pending on the connection, carefull
to not accidentally make it invalid. */
bool serf_pump__data_pending(serf_pump_t *pump)
@@ -78,9 +106,6 @@ bool serf_pump__data_pending(serf_pump_t
status = serf_bucket_peek(pump->ostream_head, &data, &len);
if (!SERF_BUCKET_READ_ERROR(status)) {
if (len > 0) {
- serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
- "Extra data to be written after sending complete "
- "requests.\n");
return true;
}
}
@@ -118,8 +143,10 @@ void serf_pump__prepare_setup(serf_pump_
}
void serf_pump__complete_setup(serf_pump_t *pump,
+ serf_bucket_t *stream,
serf_bucket_t *ostream)
{
+ pump->stream = stream;
if (ostream)
serf_bucket_aggregate_append(pump->ostream_head, ostream);
else
@@ -133,7 +160,10 @@ void serf_pump__complete_setup(serf_pump
/* Share the configuration with the ssl_decrypt and socket buckets. The
response buckets wrapping the ssl_decrypt/socket buckets won't get the
config automatically because they are upstream. */
- serf_bucket_set_config(pump->stream, pump->config);
+ if (stream != NULL) {
+ pump->stream = stream;
+ serf_bucket_set_config(pump->stream, pump->config);
+ }
/* We typically have one of two scenarios, based on whether the
application decided to encrypt this connection:
@@ -162,13 +192,13 @@ void serf_pump__store_ipaddresses_in_con
char buf[48];
if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_LOCALIP,
- "%s:%d", buf, sa->port);
+ "%s:%d", buf, (int)sa->port);
}
if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) == APR_SUCCESS) {
char buf[48];
if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_REMOTEIP,
- "%s:%d", buf, sa->port);
+ "%s:%d", buf, (int)sa->port);
}
}
@@ -177,7 +207,7 @@ static apr_status_t no_more_writes(serf_
/* Note that we should hold new requests until we open our new socket. */
pump->done_writing = true;
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
- "stop writing on 0x%x\n", pump->io->u.conn);
+ "stop writing on 0x%p\n", pump->io->u.v);
/* Clear our iovec. */
pump->vec_len = 0;
@@ -199,7 +229,7 @@ static apr_status_t socket_writev(serf_p
pump->vec_len, &written);
if (status && !APR_STATUS_IS_EAGAIN(status))
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
- "socket_sendv error %d\n", status);
+ "socket_sendv error %d on 0x%p\n", status, pump->io->u.v);
/* did we write everything? */
if (written) {
@@ -207,26 +237,31 @@ static apr_status_t socket_writev(serf_p
int i;
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
- "--- socket_sendv: %d bytes. --\n", written);
+ "--- socket_sendv: %d bytes on 0x%p. --\n",
+ (int)written, pump->io->u.v);
for (i = 0; i < conn->vec_len; i++) {
len += conn->vec[i].iov_len;
if (written < len) {
serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
- "%.*s", conn->vec[i].iov_len - (len -
written),
- conn->vec[i].iov_base);
+ "%.*s",
+ (int)(conn->vec[i].iov_len - (len - written)),
+ (const char *)conn->vec[i].iov_base);
if (i) {
memmove(conn->vec, &conn->vec[i],
sizeof(struct iovec) * (conn->vec_len - i));
conn->vec_len -= i;
}
- conn->vec[0].iov_base = (char *)conn->vec[0].iov_base +
(conn->vec[0].iov_len - (len - written));
+ conn->vec[0].iov_base = (char *)conn->vec[0].iov_base
+ + conn->vec[0].iov_len
+ - (len - written);
conn->vec[0].iov_len = len - written;
break;
} else {
serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
"%.*s",
- conn->vec[i].iov_len, conn->vec[i].iov_base);
+ (int)conn->vec[i].iov_len,
+ (const char*)conn->vec[i].iov_base);
}
}
if (len == written) {
@@ -246,16 +281,15 @@ apr_status_t serf_pump__write(serf_pump_
{
apr_status_t status = APR_SUCCESS;
apr_status_t read_status = APR_SUCCESS;
- serf_pump_t *const conn = pump;
- conn->hit_eof = FALSE;
+ pump->hit_eof = FALSE;
while (status == APR_SUCCESS) {
/* First try to write out what is already stored in the
connection vecs. */
- while (conn->vec_len && !status) {
- status = socket_writev(conn);
+ while (pump->vec_len && !status) {
+ status = socket_writev(pump);
/* If the write would have blocked, then we're done.
* Don't try to write anything else to the socket.
@@ -263,12 +297,22 @@ apr_status_t serf_pump__write(serf_pump_
if (APR_STATUS_IS_EPIPE(status)
|| APR_STATUS_IS_ECONNRESET(status)
|| APR_STATUS_IS_ECONNABORTED(status))
- return no_more_writes(conn);
+ return no_more_writes(pump);
}
- if (status || !pump)
+ if (status || !fetch_new) {
+
+ /* If we couldn't write everything that we tried,
+ make sure that we will receive a write event next time */
+ if (APR_STATUS_IS_EAGAIN(status)
+ && !pump->io->dirty_conn
+ && !(pump->io->reqevents & APR_POLLOUT))
+ {
+ serf_io__set_pollset_dirty(pump->io);
+ }
return status;
- else if (read_status || conn->vec_len || conn->hit_eof)
+ }
+ else if (read_status || pump->vec_len || pump->hit_eof)
return read_status;
/* ### optimize at some point by using read_for_sendfile */
@@ -276,12 +320,12 @@ apr_status_t serf_pump__write(serf_pump_
data as available, we probably don't want to read ALL_AVAIL, but
a lower number, like the size of one or a few TCP packets, the
available TCP buffer size ... */
- conn->hit_eof = 0;
+ pump->hit_eof = false;
read_status = serf_bucket_read_iovec(pump->ostream_head,
SERF_READ_ALL_AVAIL,
IOV_MAX,
- conn->vec,
- &conn->vec_len);
+ pump->vec,
+ &pump->vec_len);
if (read_status == SERF_ERROR_WAIT_CONN) {
/* The bucket told us that it can't provide more data until
@@ -293,15 +337,20 @@ apr_status_t serf_pump__write(serf_pump_
we can actually write something. otherwise, we could
end up in a CPU spin: socket wants something, but we
don't have anything (and keep returning EAGAIN) */
- conn->stop_writing = true;
- serf_io__set_pollset_dirty(conn->io);
+
+ serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, pump->config,
+ "Output stream requested temporary write delay "
+ "on 0x%p\n", pump->io->u.v);
+
+ pump->stop_writing = true;
+ serf_io__set_pollset_dirty(pump->io);
read_status = APR_EAGAIN;
}
else if (APR_STATUS_IS_EAGAIN(read_status)) {
/* We read some stuff, but did we read everything ? */
- if (conn->hit_eof)
+ if (pump->hit_eof)
read_status = APR_SUCCESS;
}
else if (SERF_BUCKET_READ_ERROR(read_status)) {
@@ -314,3 +363,37 @@ apr_status_t serf_pump__write(serf_pump_
return status;
}
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+ serf_bucket_t *bucket,
+ bool flush)
+{
+ apr_status_t status;
+
+ if (!flush
+ && !pump->io->dirty_conn
+ && !pump->stop_writing
+ && !(pump->io->reqevents & APR_POLLOUT)
+ && !serf_pump__data_pending(pump))
+ {
+ /* If not writing now,
+ * and not already dirty
+ * and nothing pending yet
+ Then mark the pollset dirty to trigger a write */
+
+ serf_io__set_pollset_dirty(pump->io);
+ }
+
+ serf_bucket_aggregate_append(pump->ostream_tail, bucket);
+
+ if (!flush)
+ return APR_SUCCESS;
+
+ /* Flush final output buffer (after ssl, etc.) */
+ status = serf_pump__write(pump, TRUE);
+
+ if (SERF_BUCKET_READ_ERROR(status))
+ return status;
+ else
+ return APR_SUCCESS;
+}
+
Modified: serf/branches/pump-investigate/serf_private.h
URL:
http://svn.apache.org/viewvc/serf/branches/pump-investigate/serf_private.h?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/serf_private.h (original)
+++ serf/branches/pump-investigate/serf_private.h Mon Nov 23 16:41:12 2015
@@ -136,6 +136,7 @@ typedef struct serf_io_baton_t {
serf_incoming_t *client;
serf_connection_t *conn;
serf_listener_t *listener;
+ const void *const v;
} u;
/* are we a dirty connection that needs its poll status updated? */
@@ -147,14 +148,18 @@ typedef struct serf_io_baton_t {
} serf_io_baton_t;
-typedef struct serf_pump_io_t
+typedef struct serf_pump_t
{
serf_io_baton_t *io;
serf_bucket_alloc_t *allocator;
serf_config_t *config;
+ /* The incoming stream. Stored here for easy access by users,
+ but not managed as part of the pump */
serf_bucket_t *stream;
+
+ /* The outgoing stream */
serf_bucket_t *ostream_head;
serf_bucket_t *ostream_tail;
@@ -171,6 +176,8 @@ typedef struct serf_pump_io_t
/* Set to true when ostream_tail was read to EOF */
bool hit_eof;
+
+ apr_pool_t *pool;
} serf_pump_t;
@@ -769,15 +776,23 @@ void serf_pump__init(serf_pump_t *pump,
serf_bucket_alloc_t *allocator,
apr_pool_t *pool);
+void serf_pump__done(serf_pump_t *pump);
+
bool serf_pump__data_pending(serf_pump_t *pump);
void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump);
apr_status_t serf_pump__write(serf_pump_t *pump,
bool fetch_new);
+apr_status_t serf_pump__add_output(serf_pump_t *pump,
+ serf_bucket_t *bucket,
+ bool flush);
+
/* These must always be called as a pair to avoid a memory leak */
void serf_pump__prepare_setup(serf_pump_t *pump);
-void serf_pump__complete_setup(serf_pump_t *pump, serf_bucket_t *ostream);
+void serf_pump__complete_setup(serf_pump_t *pump,
+ serf_bucket_t *stream,
+ serf_bucket_t *ostream);
/** Logging functions. **/
Modified: serf/branches/pump-investigate/test/test_server.c
URL:
http://svn.apache.org/viewvc/serf/branches/pump-investigate/test/test_server.c?rev=1715871&r1=1715870&r2=1715871&view=diff
==============================================================================
--- serf/branches/pump-investigate/test/test_server.c (original)
+++ serf/branches/pump-investigate/test/test_server.c Mon Nov 23 16:41:12 2015
@@ -247,8 +247,8 @@ CuSuite *test_server(void)
CuSuiteSetSetupTeardownCallbacks(suite, test_setup, test_teardown);
- SUITE_ADD_TEST(suite, test_listen_http);
- SUITE_ADD_TEST(suite, test_listen_http2);
+ /*SUITE_ADD_TEST(suite, test_listen_http);
+ SUITE_ADD_TEST(suite, test_listen_http2);*/
return suite;
}