In preparation for an architecture where reads and writes to the brigade
happen on the same thread, perform nonblocking reads and poll on the
socket to determine when more data is available.
---
 mod_websocket.c | 46 +++++++++++++++++++++++++++++++---------------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index b5e73be..5a04936 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -381,26 +381,22 @@ static void CALLBACK mod_websocket_plugin_close(const 
WebSocketServer *
 /*
  * Read a buffer of data from the input stream.
  */
-static apr_size_t mod_websocket_read_block(request_rec *r, char *buffer,
-                                           apr_size_t bufsiz)
+static apr_status_t mod_websocket_read_nonblock(request_rec *r, char *buffer,
+                                                apr_size_t *bufsiz)
 {
-    apr_status_t rv;
+    apr_status_t rv = APR_ENOMEM;
     apr_bucket_brigade *bb;
-    apr_size_t readbufsiz = 0;
 
     bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
     if (bb != NULL) {
         if ((rv =
              ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
-                            APR_BLOCK_READ, bufsiz)) == APR_SUCCESS) {
-            if ((rv =
-                 apr_brigade_flatten(bb, buffer, &bufsiz)) == APR_SUCCESS) {
-                readbufsiz = bufsiz;
-            }
+                            APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
+            rv = apr_brigade_flatten(bb, buffer, bufsiz);
         }
         apr_brigade_destroy(bb);
     }
-    return readbufsiz;
+    return rv;
 }
 
 /*
@@ -475,6 +471,10 @@ static void mod_websocket_data_framing(const 
WebSocketServer *server,
     apr_pool_t *pool = NULL;
     apr_bucket_alloc_t *bucket_alloc;
     apr_bucket_brigade *obb;
+    apr_pollset_t *pollset;
+    apr_pollfd_t pollfd = { 0 };
+    const apr_pollfd_t *signalled;
+    apr_int32_t pollcnt;
 
     /* We cannot use the same bucket allocator for the ouput bucket brigade
      * obb as the one associated with the connection 
(r->connection->bucket_alloc)
@@ -487,7 +487,8 @@ static void mod_websocket_data_framing(const 
WebSocketServer *server,
 
     if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
         ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
-        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL)) {
+        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
+        (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         apr_int64_t extension_bytes_remaining = 0;
@@ -504,15 +505,30 @@ static void mod_websocket_data_framing(const 
WebSocketServer *server,
         unsigned short status_code = STATUS_CODE_OK;
         unsigned char status_code_buffer[2];
 
+        /* Initialize the pollset */
+        pollfd.p = pool;
+        pollfd.desc_type = APR_POLL_SOCKET;
+        pollfd.reqevents = APR_POLLIN;
+        pollfd.desc.s = ap_get_conn_socket(state->r->connection);
+        apr_pollset_add(pollset, &pollfd);
+
         /* Allow the plugin to now write to the client */
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
-        while ((framing_state != DATA_FRAMING_CLOSE) &&
-               ((block_size =
-                 mod_websocket_read_block(r, (char *)block,
-                                          sizeof(block))) > 0)) {
+        while ((framing_state != DATA_FRAMING_CLOSE)) {
             apr_int64_t block_offset = 0;
+            apr_status_t rv;
+
+            do {
+                block_size = sizeof(block);
+                rv = mod_websocket_read_nonblock(r, (char *)block, 
&block_size);
+            } while (APR_STATUS_IS_EAGAIN(rv) &&
+                     apr_pollset_poll(pollset, -1, &pollcnt, &signalled) == 
APR_SUCCESS);
+
+            if (rv != APR_SUCCESS) {
+                break;
+            }
 
             while (block_offset < block_size) {
                 switch (framing_state) {
-- 
2.1.1

Reply via email to